add cloud mq
This commit is contained in:
@@ -1617,3 +1617,66 @@ void rocketmq_test_rc(Front* front)//用来测试补招
|
||||
queue_data_list.push_back(data);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////云前置新增功能
|
||||
|
||||
//云前置功能
|
||||
rocketmq::ConsumeStatus cloudMessageCallback(const rocketmq::MQMessageExt& msg) {
|
||||
//未初始化不处理消费
|
||||
if (INITFLAG != 1) {
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
std::string body = msg.getBody();
|
||||
std::string key = msg.getKeys();
|
||||
|
||||
if (body.empty()) {
|
||||
std::cerr << "Message body is NULL or empty." << std::endl;
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
// 日志记录
|
||||
DIY_INFOLOG("process", "【NORMAL】前置消费topic:%s_%s的云前置控制消息",FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_CLOUD.c_str());
|
||||
|
||||
std::cout << "rtdata Callback received message: " << body << std::endl;
|
||||
if (!key.empty()) {
|
||||
std::cout << "Message Key: " << key << std::endl;
|
||||
} else {
|
||||
std::cout << "Message Key: N/A" << std::endl;
|
||||
}
|
||||
|
||||
// 消息解析
|
||||
std::string devid, line;
|
||||
bool realData = false, soeData = false;
|
||||
int limit = 0;
|
||||
|
||||
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
|
||||
std::cerr << "Failed to parse the JSON message." << std::endl;
|
||||
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的云前置控制消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
// 加锁访问台账
|
||||
int dev_index;
|
||||
int mp_index;
|
||||
if( !devid.empty() && !line.empty()){
|
||||
std::lock_guard<std::mutex> lock(ledgermtx);
|
||||
dev_index = find_dev_index_from_dev_id(devid);
|
||||
mp_index = find_mp_index_from_mp_id(line);
|
||||
}
|
||||
else{
|
||||
std::cerr << "rtdata is NULL." << std::endl;
|
||||
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的云前置控制消息失败,消息的json结构不正确", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
|
||||
}
|
||||
|
||||
|
||||
if (dev_index == -1 || mp_index == -1) {
|
||||
std::cerr << "dev index or mp index is not found" << std::endl;
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
}
|
||||
|
||||
|
||||
//不再使用文件触发方式,直接调用接口向终端发起请求
|
||||
ClientManager::instance().set_real_state_count(devid, 60,mp_index);//一秒询问一次,询问60次
|
||||
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
}
|
||||
Reference in New Issue
Block a user