add function

This commit is contained in:
lnk
2025-09-02 14:58:19 +08:00
parent 077c3f45a2
commit 0af8d02bb2
10 changed files with 951 additions and 42 deletions

View File

@@ -62,7 +62,6 @@ static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者
//前置进程
extern unsigned int g_node_id;
extern int g_front_seg_index;
extern std::string subdir;
extern std::string FRONT_INST;
@@ -341,6 +340,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
/////////////////////////////////////////////////////////////////////////////////////////////////查找台账下标
// 根据终端 ID 查找 terminal_devlist 中的索引,找不到返回 -1
int find_dev_index_from_dev_id(const std::string& dev_id) {
std::lock_guard<std::mutex> lock(ledgermtx);
for (size_t i = 0; i < terminal_devlist.size(); ++i) {
if (terminal_devlist[i].terminal_id == dev_id) {
return static_cast<int>(i);
@@ -350,6 +350,7 @@ int find_dev_index_from_dev_id(const std::string& dev_id) {
}
int find_mp_index_from_mp_id(const std::string& mp_id) {
std::lock_guard<std::mutex> lock(ledgermtx);
for (const auto& dev : terminal_devlist) {
for (size_t j = 0; j < dev.line.size(); ++j) {
if (dev.line[j].monitor_id == mp_id) {
@@ -360,6 +361,16 @@ int find_mp_index_from_mp_id(const std::string& mp_id) {
return -1; // 未找到
}
std::string find_guid_index_from_dev_id(const std::string& dev_id) {
std::lock_guard<std::mutex> lock(ledgermtx);
for (size_t i = 0; i < terminal_devlist.size(); ++i) {
if (terminal_devlist[i].terminal_id == dev_id) {
return terminal_devlist[i].guid;
}
}
return ""; // 未找到
}
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
std::string parseJsonMessageRC(const std::string& inputJson) {
@@ -1619,6 +1630,352 @@ void rocketmq_test_rc(Front* front)//用来测试补招
////////////////////////////////////////////////////////////////////////////////////////////////////////////云前置新增功能
bool parseJsonMessageCLOUD(const std::string &body,
std::string &devid,
std::string &guid,
nlohmann::json &detailObj, // 这里返回整个 Detail
std::string &front_ip, // 新增:返回 FrontIP
int &node) // 新增:返回 Node
{
try {
auto j = nlohmann::json::parse(body);
// guid
if (j.contains("guid") && j["guid"].is_string()) {
guid = j["guid"].get<std::string>();
} else {
guid.clear();
}
// FrontIP
if (j.contains("FrontIP") && j["FrontIP"].is_string()) {
front_ip = j["FrontIP"].get<std::string>();
} else {
front_ip.clear();
}
// Node
if (j.contains("Node") && j["Node"].is_number_integer()) {
node = j["Node"].get<int>();
} else {
node = 0;
}
// Dev_id兼容字符串或数字
if (j.contains("Dev_id")) {
if (j["Dev_id"].is_string()) {
devid = j["Dev_id"].get<std::string>();
} else if (j["Dev_id"].is_number_integer()) {
devid = std::to_string(j["Dev_id"].get<long long>());
} else if (j["Dev_id"].is_number_unsigned()) {
devid = std::to_string(j["Dev_id"].get<unsigned long long>());
} else if (j["Dev_id"].is_number_float()) {
devid = std::to_string(j["Dev_id"].get<double>());
} else {
devid.clear();
}
} else {
devid.clear();
}
// Detail完整放入 detailObj
if (j.contains("Detail") && j["Detail"].is_object()) {
detailObj = j["Detail"]; // 直接保存整个 Detail
} else {
detailObj = nlohmann::json::object();
}
return true;
}
catch (const std::exception &e) {
std::cerr << "[parseJsonMessageCLOUD] JSON parse error: " << e.what() << "\n";
guid.clear();
devid.clear();
front_ip.clear();
node = 0;
detailObj = nlohmann::json::object();
return false;
}
}
int recordguid(const std::string &devid,
const std::string &guid,
int busytype,int busycount)
{
std::lock_guard<std::mutex> lock(ledgermtx);
for (auto &dev : terminal_devlist) {
if (dev.terminal_id == devid) {
if (dev.isbusy == 1) {
std::cout << "Dev is busybusytype is" << dev.busytype << std::endl;
//响应guid:正忙
return dev.busytype; // 正在忙,不能记录
}
dev.guid = guid;
dev.busytype = busytype;
dev.isbusy = busycount;
dev.busytimecount = 0;
return 0;
}
}
std::cout << "Dev not found" << std::endl;
//响应guid:失败
return -1; // 未找到对应的装置
}
// 按 type 解析 Msg
bool parsemsg(const std::string& devid, const std::string& guid, const nlohmann::json& detailObj) {
MsgParsed parsed;
nlohmann::json msgObj;
// 直接解析 detailObj 的 Type
if (detailObj.contains("Type")) {
if (detailObj["Type"].is_string()) {
try {
parsed.type = std::stoi(detailObj["Type"].get<std::string>(), nullptr, 0); // 支持 "0x2106" 格式
} catch (...) {
return false;
}
} else if (detailObj["Type"].is_number_integer()) {
parsed.type = detailObj["Type"].get<int>();
} else if (detailObj["Type"].is_number_unsigned()) {
parsed.type = static_cast<int>(detailObj["Type"].get<unsigned int>());
} else {
return false;
}
} else {
return false;
}
// 直接解析 detailObj 的 Msg
if (detailObj.contains("Msg") && detailObj["Msg"].is_object()) {
msgObj = detailObj["Msg"];
} else {
msgObj = nlohmann::json::object();
}
try {
switch (parsed.type) {
case 0x2131: { // 读取目录
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_FILEMENU),1)){
return true;
}
if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false;
parsed.name = msgObj["Name"].get<std::string>();
parsed.ok = true;
std::cout << "[dir parsemsg] Name: " << parsed.name << std::endl;
// 添加指令到队列当中
ClientManager::instance().add_file_menu_action_to_device(devid, parsed.name);
return true;
}
case 0x2132: { // 下载文件
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_FILEDATA),1)){
return true;
}
if (!msgObj.contains("Name") || !msgObj["Name"].is_string()) return false;
parsed.name = msgObj["Name"].get<std::string>();
parsed.ok = true;
std::cout << "[file parsemsg] Name: " << parsed.name << std::endl;
// 下发指令
ClientManager::instance().add_file_download_action_to_device(devid, parsed.name);
return true;
}
case 0x2106: { // 定值/内部定值
if (!msgObj.contains("Cldid") || !msgObj["Cldid"].is_number_integer()) return false;
if (!msgObj.contains("DataType") || !msgObj["DataType"].is_number_integer()) return false;
if (!msgObj.contains("Operate") || !msgObj["Operate"].is_number_integer()) return false;
if (!msgObj.contains("DataArray")|| !msgObj["DataArray"].is_array()) return false;
parsed.cldid = msgObj["Cldid"].get<int>();
parsed.datatype = msgObj["DataType"].get<int>();
parsed.operate = msgObj["Operate"].get<int>();
// 调试打印
std::cout << "[parsemsg] Cldid=" << parsed.cldid
<< ", DataType=0x" << std::hex << parsed.datatype << std::dec
<< ", Operate=" << parsed.operate
<< std::endl;
// 先清空数组,避免复用对象时残留
parsed.dataArray_f.clear();
parsed.dataArray_us.clear();
switch (parsed.datatype) {
case 0x0C: { // 定值float 阵列)
for (const auto& v : msgObj["DataArray"]) {
if (!v.is_number()) return false;
// 统一按 double 取,再强转成 float 更稳妥
parsed.dataArray_f.push_back(static_cast<float>(v.get<double>()));
}
// 打印 DataArray
std::cout << "[0x0C] DataArray=[";
for (size_t i = 0; i < parsed.dataArray_f.size(); ++i) {
std::cout << parsed.dataArray_f[i] << (i + 1 < parsed.dataArray_f.size() ? ", " : "");
}
std::cout << "]" << std::endl;
parsed.ok = true;
// 根据 Operate 分流1=读2=写)
switch (parsed.operate) {
case 1: { // 读
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_FIXEDVALUE),2)){
return true;
}
ClientManager::instance().get_fixedvalue_action_to_device(
devid, static_cast<uint16_t>(parsed.cldid)); // 获取装置测点定值数据
ClientManager::instance().get_fixedvaluedes_action_to_device(devid); // 获取装置定值描述
break;
}
case 2: { // 写
if(!recordguid(devid,guid,static_cast<int>(DeviceState::SET_FIXEDVALUE),1)){
return true;
}
ClientManager::instance().set_fixedvalue_action_to_device(
devid, static_cast<uint16_t>(parsed.cldid), parsed.dataArray_f); // 装置修改定值
break;
}
default:
return false;
}
break;
}
case 0x0D: { // 内部定值uint16_t 阵列)
for (const auto& v : msgObj["DataArray"]) {
if (!v.is_number_integer() && !v.is_number_unsigned()) return false;
// 范围校验 [0, 65535]
long long val = v.get<long long>();
if (val < 0 || val > 65535) return false;
parsed.dataArray_us.push_back(static_cast<uint16_t>(val));
}
// 打印 DataArray
std::cout << "[0x0D] DataArray=[";
for (size_t i = 0; i < parsed.dataArray_us.size(); ++i) {
std::cout << parsed.dataArray_us[i] << (i + 1 < parsed.dataArray_us.size() ? ", " : "");
}
std::cout << "]" << std::endl;
parsed.ok = true;
// 根据 Operate 分流1=读2=写)
switch (parsed.operate) {
case 1: { // 读
if(!recordguid(devid,guid,static_cast<int>(DeviceState::READING_INTERFIXEDVALUE),3)){
return true;
}
ClientManager::instance().get_interfixedvalue_action_to_device(devid); // 获取内部定值
ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 1); // 1-内部定值描述
ClientManager::instance().get_fixedvalucontrolword_action_to_device(devid, 2); // 2-控制字描述
break;
}
case 2: { // 写
if(!recordguid(devid,guid,static_cast<int>(DeviceState::SET_INTERFIXEDVALUE),1)){
return true;
}
ClientManager::instance().set_interfixedvalue_action_to_device(devid, parsed.dataArray_us);
break;
}
default:
return false;
}
break;
}
default:
return false;
}
return true;
}
default:
return false;
}
} catch (const std::exception& e) {
std::cerr << "[parsemsg] exception: " << e.what() << std::endl;
return false;
} catch (...) {
std::cerr << "[parsemsg] unknown exception" << std::endl;
return false;
}
}
//心跳和其他响应
void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type) {
try {
std::string guid = find_guid_index_from_dev_id(dev_id);
if(guid == "")
{
std::cerr << "dev: " << dev_id << " guid not found" << std::endl;
return;
}
// ---- 构造根 JSON ----
nlohmann::json obj;
obj["guid"] = guid;
obj["FrontIP"] = FRONT_IP;
obj["Node"] = g_front_seg_index;
// Dev_mac从台账取 addr_str 并规范化
std::string mac = get_mac_by_devid(dev_id);
obj["Dev_mac"] = mac;
// ---- 构造 Detail ----
nlohmann::json detail;
detail["Type"] = type;
// Msg
nlohmann::json msg;
msg["Time"] = static_cast<long long>(std::time(nullptr));
detail["Msg"] = std::move(msg);
// Code
detail["Code"] = reply_code;
obj["Detail"] = std::move(detail);
// ---- 入队发送 ----
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = obj.dump(); // 序列化为字符串
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
}
// 调试打印
std::cout << "[send_reply_to_cloud] queued: " << obj.dump() << std::endl;
}
catch (const std::exception& e) {
std::cerr << "send_reply_to_cloud exception: " << e.what() << std::endl;
}
}
//云前置功能
rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) {
//未初始化不处理消费
@@ -1637,7 +1994,7 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
// 日志记录
DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的云前置控制消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_CLOUD.c_str());
std::cout << "rtdata Callback received message: " << body << std::endl;
std::cout << "cloud Callback received message: " << body << std::endl;
if (!key.empty()) {
std::cout << "Message Key: " << key << std::endl;
} else {
@@ -1645,38 +2002,57 @@ rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg)
}
// 消息解析
std::string devid, line;
bool realData = false, soeData = false;
int limit = 0;
std::string guid;
std::string devid;
std::string FrontIP;
int Node;
nlohmann::json DetailObj;
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
if (!parseJsonMessageCLOUD(body, devid, guid, DetailObj,FrontIP,Node)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的云前置控制消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
return rocketmq::RECONSUME_LATER;
}
// 加锁访问台账
int dev_index;
int mp_index;
if( !devid.empty() && !line.empty()){
std::lock_guard<std::mutex> lock(ledgermtx);
dev_index = find_dev_index_from_dev_id(devid);
mp_index = find_mp_index_from_mp_id(line);
}
else{
std::cerr << "rtdata is NULL." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
}
// ====== 调试打印 ======
std::cout << "[CLOUD Msg Parsed] "
<< "guid=" << guid
<< ", devid=" << devid
<< ", FrontIP=" << FrontIP
<< ", Node=" << Node
<< std::endl;
if (dev_index == -1 || mp_index == -1) {
std::cerr << "dev index or mp index is not found" << std::endl;
return rocketmq::RECONSUME_LATER;
if(FrontIP != FRONT_IP || Node != g_front_seg_index){
std::cout << "当前进程不消费这个消息" << std::endl;
return rocketmq::CONSUME_SUCCESS;
}
//不再使用文件触发方式,直接调用接口向终端发起请求
ClientManager::instance().set_real_state_count(devid, 60,mp_index);//一秒询问一次询问60次
if(!parsemsg(devid,guid,DetailObj)){
std::cerr << "clouddata is error." << std::endl;
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息无法解析", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
}
return rocketmq::CONSUME_SUCCESS;
}
}
void rocketmq_test_getdir(Front* front)//用来测试目录获取
{
if (!front || !front->m_producer) {
std::cerr << "front 或 producer 无效\n";
return;
}
rocketmq::RocketMQProducer* producer = front->m_producer;
queue_data_t data;
data.monitor_no = 123;
data.strTopic = G_MQCONSUMER_TOPIC_CLOUD;
std::ifstream file("getdir.txt"); // 文件中存储长字符串
std::stringstream buffer;
buffer << file.rdbuf(); // 读取整个文件内容
data.strText = std::string(buffer.str());
data.mp_id = "123123";
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}