/////////////////////////////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////////////////////////////////// #include "rocketmq/DefaultMQProducer.h" #include "rocketmq/DefaultMQPushConsumer.h" #include "rocketmq/MQMessageListener.h" #include "rocketmq/MQMessageExt.h" #include "rocketmq/MQMessageQueue.h" #include "rocketmq/MQSelector.h" #include "rocketmq/SendResult.h" #include "rocketmq/SessionCredentials.h" #include "rocketmq.h" #include "nlohmann/json.hpp" #include "log4.h" #include "interface.h" #include "front.h" #include "../../client2.h" ////////////////////////////////////////////////////////////////////////////////////////////////////////// using namespace std; using nlohmann::json; ////////////////////////////////////////////////////////////////////////////////////////////////////////// #ifndef nullptr #define nullptr NULL #endif /////////////////////////////////////////////////////////////////////////////////////////////////////////// std::mutex queue_data_list_mutex; std::list queue_data_list; static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者 std::mutex devidx_lock; std::unordered_map devIdxMap;//实时数据用的idx /////////////////////////////////////////////////////////////////////////////////////////////////////////// //前置进程 extern unsigned int g_node_id; extern std::string subdir; extern std::string FRONT_INST; //生产者线程控制 extern uint32_t g_mqproducer_blocked_times; //初始化标志 extern int INITFLAG; //测试用的终端数组 extern std::vector TESTARRAY; ////////////////////////////////////////////////////////////////////////////////////////////////////////外部文件函数声明 extern void execute_bash(std::string fun,int process_num,std::string type); extern int recall_json_handle_from_mq(const std::string& body); //////////////////////////////////////////////////////////////////////////////////////////////////////本文件函数向前声明 bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type); std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid); bool writeToFile(const std::string& filePath, const std::string& xmlContent); //////////////////////////////////////////////////////////////////////////////////////////////////////消费起始控制 static const int64_t G_APP_START_MS = []() -> int64_t { using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()).count(); }(); static const int64_t G_START_SKEW_MS = 1000; // 容错 1s,可按需调整 static inline bool should_process_after_start(const rocketmq::MQMessageExt& msg) { const int64_t born_ts = static_cast(msg.getBornTimestamp()); return born_ts >= (G_APP_START_MS - G_START_SKEW_MS); } ////////////////////////////////////////////////////////////////////////////////////////////////////// namespace rocketmq { //----------------------------------------------------------------------------- // RocketMQConsumer 方法实现 //----------------------------------------------------------------------------- RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup, const std::string& nameServer) : consumer_(consumerGroup) , listener_(nullptr) { // 设置 NameServer 地址 consumer_.setNamesrvAddr(nameServer); // 设置默认的会话凭证 consumer_.setSessionCredentials(G_MQCONSUMER_ACCESSKEY, G_MQCONSUMER_SECRETKEY, G_MQCONSUMER_CHANNEL); // 初始化内部监听器 listener_ = new InternalListener(this); } void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallback callback) { // 调用 C++ 接口的订阅方法,subExpression 参数支持 Tag 过滤,如 "TagA || TagB" consumer_.subscribe(topic, tag); std::cout << "[RocketMQConsumer] 已订阅 Topic: " << topic << ", Tag 表达式: " << tag << std::endl; // 将 topic:tag 作为键,保存回调函数 std::string key = topic + ":" + tag; std::lock_guard lock(callbackMutex_); callbackMap_[key] = callback; } void RocketMQConsumer::start() { // 注册消息监听器 consumer_.registerMessageListener(listener_); // 启动消费者 try { consumer_.start(); std::cout << "[RocketMQConsumer] Consumer 已启动,等待消息..." << std::endl; } catch (const MQClientException& e) { std::cerr << "[RocketMQConsumer] 启动失败: " << e.what() << std::endl; throw; } } void RocketMQConsumer::shutdown() { try { consumer_.shutdown(); std::cout << "[RocketMQConsumer] Consumer 已关闭。" << std::endl; } catch (const MQClientException& e) { std::cerr << "[RocketMQConsumer] 关闭失败: " << e.what() << std::endl; } } RocketMQConsumer::~RocketMQConsumer() { // 先关闭消费者 try { consumer_.shutdown(); } catch (...) { // 忽略异常 } // 清理监听器 delete listener_; listener_ = nullptr; std::cout << "[RocketMQConsumer] Consumer 销毁完毕。" << std::endl; } //----------------------------------------------------------------------------- // RocketMQProducer 方法实现 //----------------------------------------------------------------------------- RocketMQProducer::RocketMQProducer(const std::string& groupName, const std::string& nameServer) : producer_(groupName) { // 设置 NameServer 地址 producer_.setNamesrvAddr(nameServer); // 设置默认的会话凭证 producer_.setSessionCredentials(G_MQPRODUCER_ACCESSKEY, G_MQPRODUCER_SECRETKEY, ""); // 启动生产者 try { producer_.start(); std::cout << "[RocketMQProducer] Producer 已启动。" << std::endl; } catch (const MQClientException& e) { std::cerr << "[RocketMQProducer] 启动失败: " << e.what() << std::endl; throw; } } void RocketMQProducer::sendMessage(const std::string& body, const std::string& topic, const std::string& tags, const std::string& keys) { try { // 创建消息对象 MQMessage msg(topic, tags, keys, body); // 同步发送 SendResult result = producer_.send(msg); std::cout << "[RocketMQProducer] 消息发送成功. " << "Topic=" << topic << ", MsgID=" << result.getMsgId() << ", Status=" << result.getSendStatus() << std::endl; } catch (const MQClientException& e) { std::cerr << "[RocketMQProducer] 发送失败: " << e.what() << std::endl; // 根据需要进行重试或日志记录 } catch (const std::exception& e) { std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl; } catch (...) { std::cerr << "[RocketMQProducer] 未知错误,消息发送失败。" << std::endl; } } void RocketMQProducer::sendMessageOrderly(const std::string& body, const std::string& topic, const std::string& tags, const std::string& keys) { try { // 创建消息对象 MQMessage msg(topic, tags, keys, body); // 使用轮询队列选择器进行顺序发送 SendResult result = producer_.send(msg, &selector_, nullptr); std::cout << "[RocketMQProducer] 顺序消息发送成功. " << "Topic=" << topic << ", MsgID=" << result.getMsgId() << ", Status=" << result.getSendStatus() << std::endl; } catch (const MQClientException& e) { std::cerr << "[RocketMQProducer] 顺序发送失败: " << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "[RocketMQProducer] 异常: " << e.what() << std::endl; } catch (...) { std::cerr << "[RocketMQProducer] 未知错误,顺序消息发送失败。" << std::endl; } } RocketMQProducer::~RocketMQProducer() { try { producer_.shutdown(); std::cout << "[RocketMQProducer] Producer 已关闭。" << std::endl; } catch (const MQClientException& e) { std::cerr << "[RocketMQProducer] 关闭失败: " << e.what() << std::endl; } } } // namespace rocketmq ///////////////////////////////////////////////////////////////////////////////////////////////////生产者接口 // 初始化生产者 void InitializeProducer(rocketmq::RocketMQProducer*& producer) { if (producer == nullptr) { try { producer = new rocketmq::RocketMQProducer(G_ROCKETMQ_PRODUCER, G_MQPRODUCER_IPPORT); } catch (const std::exception& e) { std::cerr << "[InitializeProducer] Failed to initialize producer: " << e.what() << std::endl; throw; } } } // 关闭并销毁生产者(在程序结束时调用一次) void ShutdownAndDestroyProducer() { if (g_producer != NULL) { delete g_producer; g_producer = NULL; } } // 使用 C++ 接口封装的 RocketMQProducer 类 void rocketmq_producer_send(rocketmq::RocketMQProducer* producer, const std::string& body, const std::string& topic, const std::string& tags, const std::string& keys) { if (!producer) { std::cerr << "[rocketmq_producer_send] producer 不可用,未初始化\n"; return; } //const std::string& tags = G_ROCKETMQ_TAG; //const std::string& keys = G_ROCKETMQ_KEY; try { producer->sendMessage(body, topic, tags, keys); } catch (const std::exception& e) { std::cerr << "[rocketmq_producer_send] 发送失败: " << e.what() << std::endl; DIY_ERRORLOG("process", "【ERROR】前置的%d号进程 MQ发送失败", g_front_seg_index); } } //mq发送接口 void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) { static std::string topic; static std::string cfg_His_tp; static std::string cfg_PLT_tp; static std::string cfg_PST_tp; static std::string cfg_Evt_tp; static std::string cfg_Alm_tp; static std::string cfg_Rt_tp; static bool init = false; if (!init) { cfg_His_tp = TOPIC_STAT; cfg_PLT_tp = TOPIC_PLT; cfg_PST_tp = TOPIC_PST; cfg_Evt_tp = TOPIC_EVENT; cfg_Alm_tp = TOPIC_ALARM; cfg_Rt_tp = TOPIC_RTDATA; init = true; } std::string key = data.key; std::string tag = data.tag; std::string senddata = data.strText; if (data.strTopic == "HISDATA") { topic = cfg_His_tp; } else if (data.strTopic == "PLT") { topic = cfg_PLT_tp; } else if (data.strTopic == "PST") { topic = cfg_PST_tp; } else if (data.strTopic == "Alm") { topic = cfg_Alm_tp; } else if (data.strTopic == "RTDATA") { topic = cfg_Rt_tp; } else { topic = data.strTopic; } rocketmq_producer_send(producer,senddata,topic,tag,key); } /////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理 bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& idx){ json root; try { root = json::parse(body); } catch (const std::exception& e) { std::cerr << "Failed to parse JSON message: " << e.what() << std::endl; return false; } // 提取 "messageBody" 字符串 if (!root.contains("messageBody") || !root["messageBody"].is_string()) { std::cerr << "'messageBody' is missing or not a string." << std::endl; return false; } std::string messageBodyStr = root["messageBody"]; if (messageBodyStr.empty()) { std::cerr << "'messageBody' is empty." << std::endl; return false; } // 解析 "messageBody" 中的 JSON 字符串 json messageBody; try { messageBody = json::parse(messageBodyStr); } catch (const std::exception& e) { std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl; return false; } // 检查并提取字段 if (!messageBody.contains("devSeries") || !messageBody.contains("line") || !messageBody.contains("realData") || !messageBody.contains("soeData") || !messageBody.contains("limit")|| !messageBody.contains("idx")) { std::cerr << "Missing expected fields in 'messageBody'." << std::endl; return false; } try { devSeries = messageBody["devSeries"].get(); line = messageBody["line"].get(); realData = messageBody["realData"].get(); soeData = messageBody["soeData"].get(); limit = messageBody["limit"].get(); idx = messageBody["idx"].get(); } catch (const std::exception& e) { std::cerr << "Type error while extracting fields: " << e.what() << std::endl; return false; } // 提取 guid std::string guid; if (messageBody.contains("guid") && messageBody["guid"].is_string()) { guid = messageBody["guid"].get(); } // 回复:执行结果直接看实时数据,不需要再回复,1是收到消息 if (!guid.empty()) { send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到三秒数据指令"); } return true; } bool parseJsonMessageSET(const std::string& json_str) { json root; try { root = json::parse(json_str); } catch (const std::exception& e) { std::cout << "Error parsing JSON: " << e.what() << std::endl; return false; } // [MOD] 允许 messageBody 既可为字符串也可为对象;保持原有错误打印 // ----- MOD BEGIN: messageBody 兼容 string/object ----- json messageBody; if (!root.contains("messageBody")) { std::cerr << "missing 'messageBody'" << std::endl; return false; } if (root["messageBody"].is_string()) { std::string messageBodyStr = root["messageBody"].get(); if (messageBodyStr.empty()) { std::cerr << "'messageBody' is empty" << std::endl; return false; } try { messageBody = json::parse(messageBodyStr); } catch (const std::exception& e) { std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl; return false; } } else if (root["messageBody"].is_object()) { messageBody = root["messageBody"]; } else { std::cerr << "'messageBody' is neither string nor object" << std::endl; return false; } // ----- MOD END: messageBody 兼容 string/object ----- // [MOD] 基础必填字段仅校验 guid/code/processNo/fun;frontType、processNum 改为按功能分支再校验 // ----- MOD BEGIN: 基础字段按需校验 ----- if (!messageBody.contains("guid") || !messageBody.contains("code") || !messageBody.contains("processNo") || !messageBody.contains("fun")) { std::cout << "Missing one or more required fields in messageBody." << std::endl; return false; } // ----- MOD END: 基础字段按需校验 ----- std::string guid, code_str, fun; // [MOD] frontType 改为可选,给默认值 "all" // ----- MOD BEGIN: frontType 可选 ----- std::string frontType = "all"; // ----- MOD END: frontType 可选 ----- int index_value = 0; try { guid = messageBody["guid"].get(); code_str = messageBody["code"].get(); index_value = messageBody["processNo"].get(); fun = messageBody["fun"].get(); // [MOD] 仅当存在 frontType 且为 string 时再读取,保持兼容 // ----- MOD BEGIN: frontType 存在才解析 ----- if (messageBody.contains("frontType") && messageBody["frontType"].is_string()) { frontType = messageBody["frontType"].get(); } // ----- MOD END: frontType 存在才解析 ----- } catch (const std::exception& e) { std::cerr << "Field parsing error: " << e.what() << std::endl; return false; } // 判断进程号是否匹配(保留原逻辑) if (index_value != g_front_seg_index && g_front_seg_index != 0) { std::cout << "msg index: " << index_value << " doesn't match self index: " << g_front_seg_index << std::endl; return true; } std::cout << "msg index: " << index_value << " self index: " << g_front_seg_index << std::endl; DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的进程控制消息", g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_SET.c_str()); if (code_str == "set_process") { // [MOD] 按功能分支分别校验参数: // reset/add 需要 processNum(且可选 frontType,默认 all); // delete 不需要 frontType/processNum // ----- MOD BEGIN: 分功能按需校验与执行 ----- if (fun == "reset" || fun == "add") { if (!messageBody.contains("processNum")) { std::cout << "Missing 'processNum' in JSON." << std::endl; return false; } int processNum = 0; try { processNum = messageBody["processNum"].get(); } catch (...) { std::cout << "'processNum' parsing failed." << std::endl; return false; } // 校验参数并执行(保留你原校验条件,frontType 允许默认 all) if ((processNum >= 1 && processNum < 10) && (frontType == "cloudfront" || frontType == "all")) { // if (g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1) { if (g_front_seg_index == 1) { execute_bash(fun, processNum, frontType); DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程", g_front_seg_index, fun.c_str()); send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到重置进程指令,重启所有进程!"); std::cout << "this msg should only execute once" << std::endl; } else { std::cout << "only cfg_stat_data index 1 can control process, this process not handle this msg" << std::endl; } } else { std::cout << "param is not executable" << std::endl; } } else if (fun == "delete") { // delete 分支:不要求 frontType/processNum send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到删除进程指令,这个进程将会重启 "); DIY_WARNLOG("process", "【WARN】前置的%d号进程执行指令:%s,即将重启", g_front_seg_index, fun.c_str()); std::this_thread::sleep_for(std::chrono::seconds(3)); ::_exit(-1039); // 进程退出 } else { std::cout << "param is not executable" << std::endl; } // ----- MOD END: 分功能按需校验与执行 ----- } else { std::cout << "set process code str error" << std::endl; } return true; } bool parseJsonMessageLOG(const std::string& json_str) { json root; try { root = json::parse(json_str); } catch (const std::exception& e) { std::cout << "Error parsing JSON: " << e.what() << std::endl; return false; } // 提取 messageBody(JSON 字符串) if (!root.contains("messageBody") || !root["messageBody"].is_string()) { std::cerr << "'messageBody' is missing or not a string" << std::endl; return false; } std::string messageBodyStr = root["messageBody"]; if (messageBodyStr.empty()) { std::cerr << "'messageBody' is empty." << std::endl; return false; } json messageBody; try { messageBody = json::parse(messageBodyStr); } catch (const std::exception& e) { std::cerr << "Failed to parse 'messageBody': " << e.what() << std::endl; return false; } // 校验字段是否存在 static const std::array required_fields = { "guid", "code", "processNo", "id", "level", "grade", "logtype", "frontType" }; for (const auto& field : required_fields) { if (!messageBody.contains(field)) { std::cout << "Missing '" << field << "' in messageBody." << std::endl; return false; } } // 提取字段 std::string guid, code_str, id, level, grade, logtype, frontType; int processNo = 0; try { guid = messageBody["guid"].get(); code_str = messageBody["code"].get(); processNo = messageBody["processNo"].get(); id = messageBody["id"].get(); level = messageBody["level"].get(); grade = messageBody["grade"].get(); logtype = messageBody["logtype"].get(); frontType = messageBody["frontType"].get(); } catch (const std::exception& e) { std::cerr << "Error extracting fields: " << e.what() << std::endl; return false; } // 判断进程号是否匹配 if (processNo != g_front_seg_index) { std::cout << "msg index: " << processNo << " doesn't match self index: " << g_front_seg_index << std::endl; return true; } // 判断 frontType 是否匹配 /*if (frontType != subdir) { std::cout << "msg frontType: " << frontType << " doesn't match self frontType: " << subdir << std::endl; return true; }*/ DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理日志上送消息", g_front_seg_index); std::cout << "msg index: " << processNo << " self index: " << g_front_seg_index << std::endl; /*std::cout << "msg frontType: " << frontType << " self frontType: " << subdir << std::endl;*/ // 回复消息 send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到实时日志指令"); if (code_str == "set_log") { // 校验数据合法性 bool valid = (level == "terminal" || level == "measurepoint") && (grade == "NORMAL" || grade == "DEBUG") && (logtype == "com" || logtype == "data") && (!id.empty() && !is_blank(id)); if (valid) { process_log_command(id, level, grade, logtype); } else { std::cout << "type doesn't match" << std::endl; DIY_WARNLOG("process", "【WARN】前置的%d号进程处理日志上送消息,格式不正确", g_front_seg_index); } std::cout << "this msg should only execute once" << std::endl; } return true; } bool parseJsonMessageUD(const std::string& json_str, const std::string& output_dir) { json root; try { root = json::parse(json_str); } catch (...) { std::cout << "Error parsing JSON." << std::endl; return false; } if (!root.contains("messageBody") || !root["messageBody"].is_string()) { std::cerr << "'messageBody' is missing or is not a string" << std::endl; return false; } std::string messageBodyStr = root["messageBody"]; if (messageBodyStr.empty()) { std::cerr << "'messageBody' is empty." << std::endl; return false; } json messageBody; try { messageBody = json::parse(messageBodyStr); } catch (...) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; return false; } // 提取字段 std::string code_str = messageBody.value("code", ""); int process_No = messageBody.value("processNo", -1); std::string guid = messageBody.value("guid", ""); if (guid.empty() || code_str.empty() || process_No == -1) { std::cout << "Missing required fields: guid/code/processNo" << std::endl; return false; } if (process_No != g_front_seg_index && g_front_seg_index != 0) { std::cout << "msg index: " << process_No << " doesn't match self index: " << g_front_seg_index << std::endl; return true; } std::cout << "msg index: " << process_No << " self index: " << g_front_seg_index << std::endl; DIY_INFOLOG("process", "【NORMAL】前置的%d号进程处理topic:%s_%s的台账更新消息", g_front_seg_index, FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); //send_reply_to_queue(guid, static_cast(ResponseCode::ACCEPTED), "收到台账更新指令"); std::vector reply_list; if (code_str == "add_terminal" || code_str == "ledger_modify") { std::cout << "add or modify ledger" << std::endl; if (messageBody.contains("data") && messageBody["data"].is_array()) { for (const auto& item : messageBody["data"]) { terminal_dev json_data{}; json_data.terminal_id = item.value("id", ""); json_data.terminal_name = item.value("name", ""); //json_data.org_name = item.value("org_name", ""); //json_data.maint_name = item.value("maint_name", ""); //json_data.station_name = item.value("stationName", ""); //json_data.tmnl_factory = item.value("manufacturer", ""); //json_data.tmnl_status = item.value("status", ""); json_data.dev_type = item.value("devType", ""); //json_data.dev_key = item.value("devKey", ""); //json_data.dev_series = item.value("series", ""); int procNo = -1; // 兼容 item.processNo 类型 if (item.contains("processNo")) { if (item["processNo"].is_number_integer()) procNo = item["processNo"].get(); else if (item["processNo"].is_string()) { try { procNo = std::stoi(item["processNo"].get()); } catch(...) { procNo = -1; } } } json_data.processNo = std::to_string(procNo); //int procNum = item.value("maxProcessNum", -1); //json_data.maxProcessNum = std::to_string(procNum); json_data.mac = item.value("ip", ""); //json_data.port = item.value("port", ""); //json_data.timestamp = item.value("updateTime", ""); json_data.Righttime = item.value("Righttime", ""); if (item.contains("monitorData") && item["monitorData"].is_array()) { for (const auto& monitor_item : item["monitorData"]) { json_data.line.emplace_back(); // 追加元素 auto& m = json_data.line.back(); // 引用最后一个 m.monitor_id = monitor_item.value("id", ""); m.monitor_name = monitor_item.value("name", ""); m.logical_device_seq = monitor_item.value("lineNo", ""); m.voltage_level = monitor_item.value("voltageLevel", ""); // status 可能是数字,统一转成字符串存 if (monitor_item.contains("status") && monitor_item["status"].is_number_integer()) m.status = std::to_string(monitor_item["status"].get()); else m.status = monitor_item.value("status", ""); m.terminal_connect = monitor_item.value("ptType", ""); m.terminal_id = monitor_item.value("deviceId", json_data.terminal_id); // 兼容大小写写法:PT1/pt1 等 auto get_num = [&](const nlohmann::json& j, const char* up, const char* lo, double defv) { if (j.contains(up) && (j[up].is_number_float() || j[up].is_number_integer())) return j[up].get(); if (j.contains(lo) && (j[lo].is_number_float() || j[lo].is_number_integer())) return j[lo].get(); return defv; }; m.PT1 = get_num(monitor_item, "PT1", "pt1", 0.0); m.PT2 = get_num(monitor_item, "PT2", "pt2", 0.0); m.CT1 = get_num(monitor_item, "CT1", "ct1", 0.0); m.CT2 = get_num(monitor_item, "CT2", "ct2", 0.0); } } print_terminal(json_data); /*std::string xmlContent = prepare_update(code_str, json_data, guid); if (!xmlContent.empty()) { char nodeid[20]; std::sprintf(nodeid, "%u", g_node_id); std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_" + code_str + ".xml"; writeToFile(file_name, xmlContent); }*/ if(code_str == "add_terminal"){ DeviceReply one; one.deviceId = json_data.terminal_id; std::lock_guard lock(ledgermtx); // ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中 const std::string& tid = json_data.terminal_id; auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), [&](const terminal_dev& d){ return d.terminal_id == tid; }); if (it == terminal_devlist.end()) { init_loggers_bydevid(json_data.terminal_id); terminal_devlist.push_back(json_data); //调用接口添加到通讯列表 DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), "终端 id: " + json_data.terminal_id + " 台账添加成功");*/ one.code = static_cast(ResponseCode::OK); one.result = "台账添加成功"; } else{ /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), "终端 id: " + json_data.terminal_id + " 已存在该装置,修改这个装置的台账");*/ if(erase_one_terminals_by_id(json_data.terminal_id) == 1){ //删除旧的 ClientManager::instance().remove_device(json_data.terminal_id); init_loggers_bydevid(json_data.terminal_id); terminal_devlist.push_back(json_data); //调用接口添加到通讯列表 DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), "终端 id: " + json_data.terminal_id + " 台账修改成功");*/ one.code = static_cast(ResponseCode::OK); one.result = "装置已存在,台账修改成功"; } else{ /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), "终端 id: " + json_data.terminal_id + " 台账修改失败");*/ one.code = static_cast(ResponseCode::BAD_REQUEST); one.result = "装置已存在,但是无法擦除旧数据,台账修改失败"; } } reply_list.push_back(std::move(one)); } else if(code_str == "ledger_modify"){ DeviceReply one; one.deviceId = json_data.terminal_id; std::lock_guard lock(ledgermtx); // ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中 const std::string& tid = json_data.terminal_id; auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), [&](const terminal_dev& d){ return d.terminal_id == tid; }); if (it == terminal_devlist.end()) { /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::NOT_FOUND), "终端 id: " + tid + " 无法修改台账,未找到指定装置,改为添加这个装置");*/ DIY_WARNLOG("process", "【WARN】无法修改台账,未找到指定装置: %s ,改为添加这个装置", tid.c_str()); init_loggers_bydevid(json_data.terminal_id); terminal_devlist.push_back(json_data); //调用接口添加到通讯列表 DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), "终端 id: " + json_data.terminal_id + " 台账添加成功");*/ one.code = static_cast(ResponseCode::OK); one.result = "装置不存在,台账添加成功"; } else{ if(erase_one_terminals_by_id(json_data.terminal_id) == 1){ //删除旧的 ClientManager::instance().remove_device(json_data.terminal_id); init_loggers_bydevid(json_data.terminal_id); terminal_devlist.push_back(json_data); //调用接口添加到通讯列表 DeviceInfo device = make_device_from_terminal(json_data); ClientManager::instance().add_device(device); /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), "终端 id: " + json_data.terminal_id + " 台账修改成功");*/ one.code = static_cast(ResponseCode::OK); one.result = "台账修改成功"; } else{ /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), "终端 id: " + json_data.terminal_id + " 台账修改失败");*/ one.code = static_cast(ResponseCode::BAD_REQUEST); one.result = "无法擦除旧数据,台账修改失败"; } } reply_list.push_back(std::move(one)); } } } } else if (code_str == "delete_terminal") { std::cout << "delete ledger" << std::endl; if (messageBody.contains("data") && messageBody["data"].is_array()) { for (const auto& item : messageBody["data"]) { terminal_dev json_data{}; auto id = item.value("id", ""); json_data.terminal_id = id; /*std::string xmlContent = prepare_update(code_str, json_data, guid); if (!xmlContent.empty()) { char nodeid[20]; std::sprintf(nodeid, "%u", g_node_id); std::string file_name = output_dir + "/" + nodeid + "_" + std::to_string(g_front_seg_index) + "_" + json_data.terminal_id + "_delete_terminal.xml"; writeToFile(file_name, xmlContent); }*/ DeviceReply one; one.deviceId = json_data.terminal_id; //直接加锁删除 std::lock_guard lock(ledgermtx); // ① 先判断 json_data.terminal_id 是否已在当前进程维护的终端列表中 const std::string& tid = json_data.terminal_id; auto it = std::find_if(terminal_devlist.begin(), terminal_devlist.end(), [&](const terminal_dev& d){ return d.terminal_id == tid; }); if (it == terminal_devlist.end()) { // 终端 id 不存在于当前进程维护的终端列表中 one.code = static_cast(ResponseCode::OK); one.result = "装置不存在,无法删除台账"; } else { // 终端 id 存在于当前进程维护的终端列表中 if(erase_one_terminals_by_id(json_data.terminal_id) == 1){ ClientManager::instance().remove_device(json_data.terminal_id); /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::OK), "终端 id: " + json_data.terminal_id + " 台账删除成功");*/ one.code = static_cast(ResponseCode::OK); one.result = "台账删除成功"; } else{ /*send_reply_to_queue(json_data.guid, static_cast(ResponseCode::BAD_REQUEST), "终端 id: " + json_data.terminal_id + " 台账删除失败");*/ one.code = static_cast(ResponseCode::BAD_REQUEST); one.result = "无法擦除旧数据,台账删除失败"; } } reply_list.push_back(std::move(one)); } } } else { std::cout << "code_str error" << std::endl; } send_batch_reply_to_queue(guid, reply_list); return true; } /////////////////////////////////////////////////////////////////////////////////////////////////回调函数 rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 if (INITFLAG != 1) { return rocketmq::RECONSUME_LATER; } // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) // ----- MOD BEGIN: 启动后消息过滤 ----- if (!should_process_after_start(msg)) { std::cout << "[SET] skip old message: " << "topic=" << msg.getTopic() << ", queueId=" << msg.getQueueId() << ", offset=" << msg.getQueueOffset() << ", bornTs=" << msg.getBornTimestamp() << ", appStart=" << G_APP_START_MS << std::endl; return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 } // ----- MOD END: 启动后消息过滤 ----- std::string body = msg.getBody(); std::string key = msg.getKeys(); if (body.empty()) { std::cerr << "Message body is NULL or empty." << std::endl; return rocketmq::RECONSUME_LATER; } // 日志记录 DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的实时触发消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); std::cout << "rtdata Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } // 消息解析 std::string devid; ushort line; bool realData = false, soeData = false; int limit = 0; int idx = 0; if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit,idx)) { std::cerr << "Failed to parse the JSON message." << std::endl; DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); return rocketmq::RECONSUME_LATER; } // 加锁访问台账 if( !devid.empty() && line > 0){ //不再使用文件触发方式,直接调用接口向终端发起请求 //不注册guid,直接将请求指令下发装置,排队处理 //添加在线判断 if (ClientManager::instance().get_dev_status(devid) != 1) { std::cout << "devid对应装置不在线: " << devid << std::endl; // 记录日志不响应 web 端 DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的实时数据触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str()); return rocketmq::CONSUME_SUCCESS; } //记录idx devidx_set(devid, idx);//每次下发都会更新,不加入运行用的结构体 ClientManager::instance().set_real_state_count(devid, 60, line);//一秒询问一次,询问60次,下一次同一个测点调用的话就会刷新 } else{ std::cerr << "rtdata is NULL." << std::endl; DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); return rocketmq::RECONSUME_LATER; } return rocketmq::CONSUME_SUCCESS; } rocketmq::ConsumeStatus myMessageCallbackupdate(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 if (INITFLAG != 1) { return rocketmq::RECONSUME_LATER; } // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) // ----- MOD BEGIN: 启动后消息过滤 ----- if (!should_process_after_start(msg)) { std::cout << "[SET] skip old message: " << "topic=" << msg.getTopic() << ", queueId=" << msg.getQueueId() << ", offset=" << msg.getQueueOffset() << ", bornTs=" << msg.getBornTimestamp() << ", appStart=" << G_APP_START_MS << std::endl; return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 } // ----- MOD END: 启动后消息过滤 ----- std::string body = msg.getBody(); std::string key = msg.getKeys(); if (body.empty()) { std::cerr << "Message body is NULL or empty." << std::endl; return rocketmq::RECONSUME_LATER; } // 打印日志 std::cout << "ledger update Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } // 调用业务逻辑处理函数 std::string updatefilepath = FRONT_PATH + "/etc/ledgerupdate"; if (!parseJsonMessageUD(body, updatefilepath)) { DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_UD.c_str()); } return rocketmq::CONSUME_SUCCESS; } rocketmq::ConsumeStatus myMessageCallbackset(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 if (INITFLAG != 1) { return rocketmq::RECONSUME_LATER; } // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) // ----- MOD BEGIN: 启动后消息过滤 ----- if (!should_process_after_start(msg)) { std::cout << "[SET] skip old message: " << "topic=" << msg.getTopic() << ", queueId=" << msg.getQueueId() << ", offset=" << msg.getQueueOffset() << ", bornTs=" << msg.getBornTimestamp() << ", appStart=" << G_APP_START_MS << std::endl; return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 } // ----- MOD END: 启动后消息过滤 ----- std::string body = msg.getBody(); std::string key = msg.getKeys(); if (body.empty()) { std::cerr << "Message body is NULL or empty." << std::endl; return rocketmq::RECONSUME_LATER; } // 打印消息内容和 key std::cout << "process set Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } // 调用业务处理逻辑 if (!parseJsonMessageSET(body)) { DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s - tag:%s的进程控制消息失败,消息的json结构不正确", g_front_seg_index, G_MQCONSUMER_TOPIC_SET.c_str(), FRONT_INST.c_str()); } return rocketmq::CONSUME_SUCCESS; } rocketmq::ConsumeStatus myMessageCallbacklog(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 if (INITFLAG != 1) { return rocketmq::RECONSUME_LATER; } // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) // ----- MOD BEGIN: 启动后消息过滤 ----- if (!should_process_after_start(msg)) { std::cout << "[SET] skip old message: " << "topic=" << msg.getTopic() << ", queueId=" << msg.getQueueId() << ", offset=" << msg.getQueueOffset() << ", bornTs=" << msg.getBornTimestamp() << ", appStart=" << G_APP_START_MS << std::endl; return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 } // ----- MOD END: 启动后消息过滤 ----- std::string body = msg.getBody(); std::string key = msg.getKeys(); if (body.empty()) { std::cerr << "Message body is NULL or empty." << std::endl; return rocketmq::RECONSUME_LATER; } // 打印日志信息 std::cout << "log Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } // 执行日志上送处理 if (!parseJsonMessageLOG(body)) { DIY_ERRORLOG("process", "【ERROR】前置的%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_LOG.c_str()); } return rocketmq::CONSUME_SUCCESS; } rocketmq::ConsumeStatus myMessageCallbackrecall(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 if (INITFLAG != 1) { return rocketmq::RECONSUME_LATER; } // [MOD] 仅消费启动后的消息:历史消息直接跳过并 ACK(即使并发也安全) // ----- MOD BEGIN: 启动后消息过滤 ----- if (!should_process_after_start(msg)) { std::cout << "[SET] skip old message: " << "topic=" << msg.getTopic() << ", queueId=" << msg.getQueueId() << ", offset=" << msg.getQueueOffset() << ", bornTs=" << msg.getBornTimestamp() << ", appStart=" << G_APP_START_MS << std::endl; return rocketmq::CONSUME_SUCCESS; // 确认成功,避免重投 } // ----- MOD END: 启动后消息过滤 ----- // 调试输出 std::cout << "myMessageCallbackrecall" << std::endl; std::string body = msg.getBody(); std::string key = msg.getKeys(); if (body.empty()) { std::cerr << "Message body is NULL or empty." << std::endl; return rocketmq::RECONSUME_LATER; } // 打印消息内容 std::cout << "recall Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } // 解析 JSON 字符串 recall_json_handle_from_mq(body);//不再使用文件补招方式 return rocketmq::CONSUME_SUCCESS; } //////////////////////////////////////////////////////////////////////////////////////////////////生成实时触发和停止文件 //根据监测点序号和终端序号来生成触发文件,后续可修改 std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit) { std::ostringstream xmlContent; xmlContent << "\n" << "\n" << " \n" << " \n" << " \n" << "\n"; return xmlContent.str(); } //根据监测点序号和终端序号来生成触发文件,后续可修改 std::string createdeleteXmlContent(int devindex, int mpindex) { std::ostringstream xmlContent; xmlContent << "\n" << "\n" << " \n" << " \n" << " \n" << "\n"; return xmlContent.str(); } // 写入 XML 内容到文件的函数 bool writeToFile(const std::string& filePath, const std::string& xmlContent) { // 打开文件流以写入 XML 内容 std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char* if (outFile.is_open()) { outFile << xmlContent; // 写入内容 outFile.close(); std::cout << "XML file created at: " << filePath << std::endl; return true; } else { std::cerr << "Failed to open file for writing: " << filePath << std::endl; return false; } } // 创建并写入新的 XML 文件的主函数 bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type) { std::string xmlContent = ""; std::string directory = ""; std::string filePath = ""; if(type == "new"){ // 构造 XML 内容 xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit); // 设置文件路径 directory = FRONT_PATH + "/etc/trigger3s/"; filePath = directory + "newtrigger.xml"; } else if(type == "delete"){ // 构造 XML 内容 xmlContent = createdeleteXmlContent(devindex, mpindex); // 设置文件路径 directory = FRONT_PATH + "/etc/trigger3s/"; filePath = directory + "deletetrigger.xml"; } else{ std::cerr << "Failed to create xmlfile,type error: " << std::endl; return false; } // 创建目录(如果不存在) if (system(("mkdir -p " + directory).c_str()) != 0) { std::cerr << "Failed to create directory: " << directory << std::endl; return false; } // 将 XML 内容写入文件 return writeToFile(filePath, xmlContent); } /////////////////////////////////////////////////////////////////////////////////////////////////////////////生成台账更新文件 void add_indent(std::stringstream& stream, int level) { for (int i = 0; i < level; ++i) { stream << " "; // 每一级缩进 2 个空格 } } std::string prepare_update(const std::string& code_str, const terminal_dev& json_data,const std::string& guid) { std::cout << "prepare update" << std::endl; std::stringstream xmlStream; int indentLevel = 0; // 缩进级别 // 根节点 add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; // 添加 guid 节点 add_indent(xmlStream, indentLevel); xmlStream << "" << guid << "" << std::endl; if (code_str == "ledger_modify" || code_str == "add_terminal") { // 如果是 modify 类型 if (code_str == "ledger_modify") { add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; } else { add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; } // 添加数据部分 add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.terminal_id << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.addr_str << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.dev_type << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.maint_name << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.org_name << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.port << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.station_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.terminal_name << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.timestamp << "" << std::endl; // Assuming `timestamp` //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.tmnl_factory << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.tmnl_status << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.dev_series << "" << std::endl; //lnk20250210 add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.processNo << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.Righttime << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.dev_key << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << json_data.mac << "" << std::endl; // monitorData 部分 for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) { const ledger_monitor& monitor = json_data.line[i]; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.monitor_id << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.monitor_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.logical_device_seq << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.voltage_level << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.terminal_connect << "" << std::endl; //add_indent(xmlStream, indentLevel); //xmlStream << "" << monitor.timestamp << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.terminal_id << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.CT1 << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.CT2 << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.PT1 << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.PT2 << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.status << "" << std::endl; indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; // 结束 modify 或 add 标签 if (code_str == "ledger_modify") { indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } else { indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } } else if (code_str == "delete_terminal") { // 如果是 delete 类型 add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.terminal_id << "" << std::endl; indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } else { std::cerr << "code_str error" << std::endl; return ""; } // 结束根节点 indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; return xmlStream.str(); // 返回构造的 XML 字符串 } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////终端连接消息 void connect_status_to_queue(const std::string& id, const std::string& datetime, int status)//这个不使用,使用新的带有时间封装的 { try { // 构造 JSON nlohmann::json jsonObject; jsonObject["id"] = id; jsonObject["date"] = datetime; jsonObject["status"] = status; // 构造发送结构 queue_data_t data; data.strTopic = G_CONNECT_TOPIC; data.strText = jsonObject.dump(); // 转换为字符串 data.tag = G_CONNECT_TAG; data.key = G_CONNECT_KEY; //if (g_node_id == STAT_DATA_BASE_NODE_ID) { std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); //} } catch (const std::exception& e) { std::cerr << "connect_status_to_queue exception: " << e.what() << std::endl; } } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////响应消息 void send_reply_to_queue(const std::string& guid, const int code, const std::string& result) { try { // 构造 JSON 对象 nlohmann::json obj; obj["guid"] = guid; obj["code"] = code; obj["result"] = result; obj["processNo"] = g_front_seg_index; obj["nodeId"] = FRONT_INST; // 构造 queue 消息 queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = obj.dump(); // 序列化为 JSON 字符串 connect_info.tag = Topic_Reply_Tag; connect_info.key = Topic_Reply_Key; // 加入发送队列(线程安全) std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(connect_info); } catch (const std::exception& e) { std::cerr << "send_reply_to_queue exception: " << e.what() << std::endl; } } //台账更新批量回复 void send_batch_reply_to_queue(const std::string& guid, const std::vector& replies) { try { nlohmann::json root; root["guid"] = guid; nlohmann::json arr = nlohmann::json::array(); for (const auto& r : replies) { nlohmann::json item; item["deviceId"] = r.deviceId; item["code"] = r.code; item["result"] = r.result; arr.push_back(std::move(item)); } root["data"] = std::move(arr); queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = root.dump(); connect_info.tag = "LEDGER"; connect_info.key = Topic_Reply_Key; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(std::move(connect_info)); } catch (const std::exception& e) { std::cerr << "send_batch_reply_to_queue exception: " << e.what() << std::endl; } } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////心跳消息 void send_heartbeat_to_queue(const std::string& status) { try{ nlohmann::json obj; obj["nodeId"] = FRONT_INST; obj["frontType"] = "cloudfront"; obj["processNo"] = g_front_seg_index; obj["status"] = status; queue_data_t connect_info; connect_info.strTopic = Heart_Beat_Topic; connect_info.strText = obj.dump(); // 紧凑格式 JSON connect_info.tag = Heart_Beat_Tag; connect_info.key = Heart_Beat_Key; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(connect_info); } catch (const std::exception& e) { std::cerr << "send_heartbeat_to_queue exception: " << e.what() << std::endl; } } /////////////////////////////////////////////////////////////////////////////////////////////////////////////稳态测试函数 bool shouldSkipTerminal(const std::string& terminal_id) { for (size_t i = 0; i < TESTARRAY.size(); ++i) { if (TESTARRAY[i] == terminal_id) { return true; } } return false; } void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { if (!INITFLAG) { std::cout << "前置未初始化完成\n"; return; } if (!front || !front->m_producer) { std::cerr << "front 或 producer 无效\n"; return; } rocketmq::RocketMQProducer* producer = front->m_producer; queue_data_t data; data.strTopic = G_ROCKETMQ_TOPIC_TEST; data.mp_id = "0"; std::vector filenames = { "long_string.txt", "PLT_string.txt", "fluc_string.txt", "qvvr_string.txt" }; for (const auto& filename : filenames) { std::ifstream file(filename); if (!file.is_open()) { std::cerr << "跳过无法打开的文件: " << filename << std::endl; continue; } std::stringstream buffer; buffer << file.rdbuf(); std::string base_strText = buffer.str(); std::time_t t = std::time(nullptr); std::tm* time_info = std::localtime(&t); time_info->tm_sec = 0; std::time_t base_time_t = std::mktime(time_info); long long current_time_ms = static_cast(base_time_t) * 1000; int total_messages = mpnum; if (type == 0) { std::cout << "use ledger send msg" << std::endl; //根据台账模式下每个进程都会发送 for (size_t i = 0; total_messages > 0 && i < terminal_devlist.size(); ++i) { const auto& dev = terminal_devlist[i]; if (shouldSkipTerminal(dev.terminal_id)) { std::cout << dev.terminal_id << " use true message" << std::endl; continue; } for (size_t j = 0; j < dev.line.size(); ++j) { if (dev.line[j].monitor_id.empty()) break; data.mp_id = dev.line[j].monitor_id; data.monitor_no = static_cast(i + j); std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; try { auto j = nlohmann::json::parse(modified_strText); j["Did"] = i; if (j.contains("Msg") && j["Msg"].is_object()) { j["Msg"]["Cldid"] = j; if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { for (auto& item : j["Msg"]["DataArray"]) { if (item.is_object()) { item["DataTimeSec"] = std::stoll(modified_time); } } } } modified_strText = j.dump(); } catch (...) { // 保持原始文本 } data.strText = modified_strText; data.tag = G_ROCKETMQ_TAG_TEST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; } } } else { std::cout << "use monitor + number send msg" << std::endl; //根据虚构监测点模式下只有进程1发送 for (int i = 0; (total_messages > 0 && g_front_seg_index == 1 ) && i < total_messages; ++i) { std::string monitor_id = "testmonitor" + std::to_string(i); data.mp_id = monitor_id; data.monitor_no = i; std::string modified_time = std::to_string(current_time_ms / 1000); std::string modified_strText = base_strText; try { auto j = nlohmann::json::parse(modified_strText); j["Did"] = 0; if (j.contains("Msg") && j["Msg"].is_object()) { j["Msg"]["Cldid"] = data.mp_id; if (j["Msg"].contains("DataArray") && j["Msg"]["DataArray"].is_array()) { for (auto& item : j["Msg"]["DataArray"]) { if (item.is_object()) { item["DataTimeSec"] = std::stoll(modified_time); } } } } modified_strText = j.dump(); } catch (...) { // 保持原始文本 } data.strText = modified_strText; data.tag = G_ROCKETMQ_TAG_TEST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); std::cout << "Sent message " << (i + 1) << " with Monitor " << data.monitor_no << " and TIME " << modified_time << std::endl; } } std::cout << "Finished sending " << total_messages << " messages." << std::endl; } } ////////////////////////////////////////////////////////////////////////////////////////////////////////////其他测试函数 void rocketmq_test_rt(Front* front)//用来测试实时数据 { if (!front || !front->m_producer) { std::cerr << "front 或 producer 无效\n"; return; } rocketmq::RocketMQProducer* producer = front->m_producer; queue_data_t data; data.monitor_no = 123; data.strTopic = G_MQCONSUMER_TOPIC_RT; std::ifstream file("rt.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = std::string(buffer.str()); data.mp_id = "123123"; data.tag = FRONT_INST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } void rocketmq_test_ud(Front* front)//用来测试台账更新 { if (!front || !front->m_producer) { std::cerr << "front 或 producer 无效\n"; return; } rocketmq::RocketMQProducer* producer = front->m_producer; queue_data_t data; data.monitor_no = 123; data.strTopic = G_MQCONSUMER_TOPIC_UD; std::ifstream file("ud.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = std::string(buffer.str()); data.mp_id = "123123"; data.tag = FRONT_INST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } void rocketmq_test_set(Front* front)//用来测试进程控制脚本 { if (!front || !front->m_producer) { std::cerr << "front 或 producer 无效\n"; return; } rocketmq::RocketMQProducer* producer = front->m_producer; queue_data_t data; data.monitor_no = 123; data.strTopic = G_MQCONSUMER_TOPIC_SET; std::ifstream file("set.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = std::string(buffer.str()); data.mp_id = "123123"; data.tag = FRONT_INST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } void rocketmq_test_rc(Front* front)//用来测试补招 { if (!front || !front->m_producer) { std::cerr << "front 或 producer 无效\n"; return; } rocketmq::RocketMQProducer* producer = front->m_producer; queue_data_t data; data.monitor_no = 123; data.strTopic = G_MQCONSUMER_TOPIC_RC; std::ifstream file("rc.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = std::string(buffer.str()); data.mp_id = "123123"; data.tag = FRONT_INST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////云前置新增功能 bool parseJsonMessageCLOUD(const std::string &body, std::string &devid, std::string &guid, nlohmann::json &detailObj, // 这里返回整个 Detail std::string &front_ip, // 新增:返回 FrontIP int &node) // 新增:返回 Node { try { auto j = nlohmann::json::parse(body); // guid if (j.contains("guid") && j["guid"].is_string()) { guid = j["guid"].get(); } else { guid.clear(); } // FrontIP if (j.contains("FrontIP") && j["FrontIP"].is_string()) { front_ip = j["FrontIP"].get(); } else { front_ip.clear(); } // Node if (j.contains("Node") && j["Node"].is_number_integer()) { node = j["Node"].get(); } else { node = 0; } // Dev_id(兼容字符串或数字) if (j.contains("Dev_id")) { if (j["Dev_id"].is_string()) { devid = j["Dev_id"].get(); } else if (j["Dev_id"].is_number_integer()) { devid = std::to_string(j["Dev_id"].get()); } else if (j["Dev_id"].is_number_unsigned()) { devid = std::to_string(j["Dev_id"].get()); } else if (j["Dev_id"].is_number_float()) { devid = std::to_string(j["Dev_id"].get()); } else { devid.clear(); } } else { devid.clear(); } // Detail(完整放入 detailObj if (j.contains("Detail") && j["Detail"].is_object()) { detailObj = j["Detail"]; // 直接保存整个 Detail } else { detailObj = nlohmann::json::object(); } return true; } catch (const std::exception &e) { std::cerr << "[parseJsonMessageCLOUD] JSON parse error: " << e.what() << "\n"; guid.clear(); devid.clear(); front_ip.clear(); node = 0; detailObj = nlohmann::json::object(); return false; } } int recordguid(const std::string &devid, const std::string &guid, int busytype,int busycount) { std::lock_guard lock(ledgermtx); for (auto &dev : terminal_devlist) { if (dev.terminal_id == devid) { if (dev.isbusy == 1) { std::cout << "Dev is busy,busytype is" << dev.busytype << std::endl; //响应guid:正忙 return dev.busytype; // 正在忙,不能记录 } dev.guid = guid; dev.busytype = busytype; dev.isbusy = busycount; dev.busytimecount = 0; return 0; } } std::cout << "Dev not found" << std::endl; //响应guid:失败 return -1; // 未找到对应的装置 } // 按 type 解析 Msg bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann::json& detailObj) { MsgParsed parsed; nlohmann::json msgObj; // 直接解析 detailObj 的 Type if (detailObj.contains("Type")) { if (detailObj["Type"].is_string()) { try { parsed.type = std::stoi(detailObj["Type"].get(), nullptr, 0); // 支持 "0x2106" 格式 } catch (...) { return false; } } else if (detailObj["Type"].is_number_integer()) { parsed.type = detailObj["Type"].get(); } else if (detailObj["Type"].is_number_unsigned()) { parsed.type = static_cast(detailObj["Type"].get()); } else { return false; } } else { return false; } // 直接解析 detailObj 的 Msg if (detailObj.contains("Msg") && detailObj["Msg"].is_object()) { msgObj = detailObj["Msg"]; } else { msgObj = nlohmann::json::object(); } try { switch (parsed.type) { case 0x2131: { // 读取目录 if(!recordguid(devid,guid,static_cast(DeviceState::READING_FILEMENU),1)){ return true; } if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false; parsed.name = msgObj["Name"].get(); parsed.ok = true; std::cout << "[dir parsemsg] Name: " << parsed.name << std::endl; // 添加指令到队列当中 ClientManager::instance().add_file_menu_action_to_device(devid, parsed.name); return true; } case 0x2132: { // 下载文件 if(!recordguid(devid,guid,static_cast(DeviceState::READING_FILEDATA),1)){ return true; } if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false; parsed.name = msgObj["Name"].get(); parsed.ok = true; std::cout << "[file parsemsg] Name: " << parsed.name << std::endl; // 下发指令 ClientManager::instance().add_file_download_action_to_device(devid, parsed.name); return true; } case 0x2106: { // 定值/内部定值 if (!msgObj.contains("Cldid") || !msgObj["Cldid"].is_number_integer()) return false; if (!msgObj.contains("DataType") || !msgObj["DataType"].is_number_integer()) return false; if (!msgObj.contains("Operate") || !msgObj["Operate"].is_number_integer()) return false; if (!msgObj.contains("DataArray")|| !msgObj["DataArray"].is_array()) return false; parsed.cldid = msgObj["Cldid"].get(); parsed.datatype = msgObj["DataType"].get(); parsed.operate = msgObj["Operate"].get(); // 调试打印 std::cout << "[parsemsg] Cldid=" << parsed.cldid << ", DataType=0x" << std::hex << parsed.datatype << std::dec << ", Operate=" << parsed.operate << std::endl; // 先清空数组,避免复用对象时残留 parsed.dataArray_f.clear(); parsed.dataArray_us.clear(); switch (parsed.datatype) { case 0x0C: { // 定值(float 阵列) for (const auto& v : msgObj["DataArray"]) { if (!v.is_number()) return false; // 统一按 double 取,再强转成 float 更稳妥 parsed.dataArray_f.push_back(static_cast(v.get())); } // 打印 DataArray std::cout << "[0x0C] DataArray=["; for (size_t i = 0; i < parsed.dataArray_f.size(); ++i) { std::cout << parsed.dataArray_f[i] << (i + 1 < parsed.dataArray_f.size() ? ", " : ""); } std::cout << "]" << std::endl; parsed.ok = true; // 根据 Operate 分流(1=读,2=写) switch (parsed.operate) { case 1: { // 读 if(!recordguid(devid,guid,static_cast(DeviceState::READING_FIXEDVALUE),2)){ return true; } ClientManager::instance().get_fixedvalue_action_to_device( devid, static_cast(parsed.cldid)); // 获取装置测点定值数据 ClientManager::instance().get_fixedvaluedes_action_to_device(devid); // 获取装置定值描述 只有这一步可以响应成功和关闭 break; } case 2: { // 写 if(!recordguid(devid,guid,static_cast(DeviceState::SET_FIXEDVALUE),1)){ return true; } ClientManager::instance().set_fixedvalue_action_to_device( devid, static_cast(parsed.cldid), parsed.dataArray_f); // 装置修改定值 break; } default: return false; } break; } case 0x0D: { // 内部定值(uint16_t 阵列) for (const auto& v : msgObj["DataArray"]) { if (!v.is_number_integer() && !v.is_number_unsigned()) return false; // 范围校验 [0, 65535] long long val = v.get(); if (val < 0 || val > 65535) return false; parsed.dataArray_us.push_back(static_cast(val)); } // 打印 DataArray std::cout << "[0x0D] DataArray=["; for (size_t i = 0; i < parsed.dataArray_us.size(); ++i) { std::cout << parsed.dataArray_us[i] << (i + 1 < parsed.dataArray_us.size() ? ", " : ""); } std::cout << "]" << std::endl; parsed.ok = true; // 根据 Operate 分流(1=读,2=写) switch (parsed.operate) { case 1: { // 读 if(!recordguid(devid,guid,static_cast(DeviceState::READING_INTERFIXEDVALUE),3)){ return true; } ClientManager::instance().get_interfixedvalue_action_to_device(devid); // 获取内部定值 ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 1); // 1-内部定值描述 ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 2); // 2-控制字描述 只有这一步可以响应成功和关闭 break; } case 2: { // 写 if(!recordguid(devid,guid,static_cast(DeviceState::SET_INTERFIXEDVALUE),1)){ return true; } ClientManager::instance().set_interfixedvalue_action_to_device(devid, parsed.dataArray_us); break; } default: return false; } break; } default: return false; } return true; } default: return false; } } catch (const std::exception& e) { std::cerr << "[parsemsg] exception: " << e.what() << std::endl; return false; } catch (...) { std::cerr << "[parsemsg] unknown exception" << std::endl; return false; } } //心跳和其他响应 void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type, const std::string& guid, const std::string& mac) { try { /*std::string guid = find_guid_index_from_dev_id(dev_id);*/ if(guid == "") { std::cerr << "dev: " << dev_id << " guid not found" << std::endl; return; } // ---- 构造根 JSON ---- nlohmann::json obj; obj["guid"] = guid; obj["FrontId"] = FRONT_INST; obj["Node"] = g_front_seg_index; // Dev_mac:从台账取 addr_str 并规范化 //std::string mac = get_mac_by_devid(dev_id); obj["Dev_mac"] = normalize_mac(mac); // ---- 构造 Detail ---- nlohmann::json detail; detail["Type"] = type; // Msg nlohmann::json msg; msg["Time"] = static_cast(std::time(nullptr)); detail["Msg"] = std::move(msg); // Code detail["Code"] = reply_code; obj["Detail"] = std::move(detail); // ---- 入队发送 ---- queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = obj.dump(); // 序列化为字符串 connect_info.tag = Topic_Reply_Tag; connect_info.key = Topic_Reply_Key; { std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(std::move(connect_info)); } // 调试打印 std::cout << "[send_reply_to_cloud] queued: " << obj.dump() << std::endl; } catch (const std::exception& e) { std::cerr << "send_reply_to_cloud exception: " << e.what() << std::endl; } } //云前置功能 rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) { //未初始化不处理消费 if (INITFLAG != 1) { return rocketmq::RECONSUME_LATER; } std::string body = msg.getBody(); std::string key = msg.getKeys(); if (body.empty()) { std::cerr << "Message body is NULL or empty." << std::endl; return rocketmq::RECONSUME_LATER; } // 日志记录 DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的云前置控制消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_CLOUD.c_str()); std::cout << "cloud Callback received message: " << body << std::endl; if (!key.empty()) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } // 消息解析 std::string guid; std::string devid; std::string FrontId; int Node; nlohmann::json DetailObj; if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontId,Node)) { std::cerr << "Failed to parse the JSON message." << std::endl; DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的云前置控制消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); return rocketmq::RECONSUME_LATER; } // ====== 调试打印 ====== std::cout << "[CLOUD Msg Parsed] " << "guid=" << guid << ", devid=" << devid << ", FrontId=" << FrontId << ", Node=" << Node << std::endl; if(FrontId != FRONT_INST || Node != g_front_seg_index){ std::cout << "当前进程不消费这个消息" << std::endl; return rocketmq::CONSUME_SUCCESS; } if(!parsemsg(devid,guid,DetailObj)){ std::cerr << "clouddata is error." << std::endl; DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息无法解析", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); } return rocketmq::CONSUME_SUCCESS; } void rocketmq_test_getdir(Front* front)//用来测试目录获取 { if (!front || !front->m_producer) { std::cerr << "front 或 producer 无效\n"; return; } rocketmq::RocketMQProducer* producer = front->m_producer; queue_data_t data; data.monitor_no = 123; data.strTopic = G_MQCONSUMER_TOPIC_CLOUD; std::ifstream file("getdir.txt"); // 文件中存储长字符串 std::stringstream buffer; buffer << file.rdbuf(); // 读取整个文件内容 data.strText = std::string(buffer.str()); data.mp_id = "123123"; data.tag = G_ROCKETMQ_TAG_TEST; data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } //状态翻转 void connect_status_update(const std::string& id, int status) { // 获取当前系统时间(格式: YYYY-MM-DD HH:MM:SS) auto now = std::chrono::system_clock::now(); std::time_t now_c = std::chrono::system_clock::to_time_t(now); std::tm tm_buf; localtime_r(&now_c, &tm_buf); std::ostringstream datetime_ss; datetime_ss << std::put_time(&tm_buf, "%Y-%m-%d %H:%M:%S"); std::string datetime_str = datetime_ss.str(); // 构造 JSON 对象 nlohmann::json j; j["id"] = id; j["date"] = datetime_str; j["status"] = std::to_string(status); // 构造队列消息 queue_data_t connect_info; connect_info.strTopic = G_CONNECT_TOPIC; connect_info.strText = j.dump(); // 转成字符串 connect_info.tag = G_CONNECT_TAG; connect_info.key = G_CONNECT_KEY; { std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(std::move(connect_info)); } // 调试打印 std::cout << "[connect_status_update] queued JSON:\n" << j.dump(4) << std::endl; }