add tag and key

This commit is contained in:
lnk
2025-09-22 13:26:52 +08:00
parent 825315440d
commit 169aa9b34a
7 changed files with 71 additions and 14 deletions

View File

@@ -125,6 +125,10 @@ std::string TOPIC_RTDATA = "";
std::string G_ROCKETMQ_TAG = "";//tag
std::string G_ROCKETMQ_KEY = "";//key
//实时数据tagkey
std::string G_RT_TAG = "";//tag
std::string G_RT_KEY = "";//key
//生产者
std::string G_ROCKETMQ_PRODUCER = ""; //rocketmq producer
std::string G_MQPRODUCER_IPPORT = ""; //rocketmq ip+port
@@ -272,6 +276,10 @@ void loadConfig(const std::string& filename) {
strMap["Queue.QUEUE_TAG"] = &G_ROCKETMQ_TAG;
strMap["Queue.QUEUE_KEY"] = &G_ROCKETMQ_KEY;
//添加rt的tagkey
strMap["Queue.RT_TAG"] = &G_RT_TAG;
strMap["Queue.RT_KEY"] = &G_RT_KEY;
// [RocketMq] —— 生产者
strMap["RocketMq.producer"] = &G_ROCKETMQ_PRODUCER;
strMap["RocketMq.Ipport"] = &G_MQPRODUCER_IPPORT;
@@ -2922,6 +2930,8 @@ void upload_data_test(){
data.strTopic = TOPIC_ALARM;
data.strText = js;
data.mp_id = "test";
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
@@ -3314,6 +3324,8 @@ bool send_file_list(const std::string &dev_id, const std::vector<tag_dir_info> &
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = j.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(std::move(connect_info));
@@ -3707,6 +3719,8 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = j.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -3903,6 +3917,8 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector<DZ_k
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = j.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -3968,6 +3984,8 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = jsonString;
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
// 加入发送队列(带互斥锁保护)
queue_data_list_mutex.lock();

View File

@@ -265,6 +265,8 @@ public:
std::string strTopic; //发送topic
std::string strText; //发送的json字符串
std::string mp_id; //监测点id
std::string tag; //消息tag
std::string key; // 消息key
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@@ -163,6 +163,8 @@ protected:
queue_data_t connect_info;
connect_info.strTopic = G_LOG_TOPIC;
connect_info.strText = jsonString;
connect_info.tag = G_LOG_TAG;
connect_info.key = G_LOG_KEY;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(connect_info);

View File

@@ -266,14 +266,16 @@ void ShutdownAndDestroyProducer()
// 使用 C++ 接口封装的 RocketMQProducer 类
void rocketmq_producer_send(rocketmq::RocketMQProducer* producer,
const std::string& body,
const std::string& topic) {
const std::string& topic,
const std::string& tags,
const std::string& keys) {
if (!producer) {
std::cerr << "[rocketmq_producer_send] producer 不可用,未初始化\n";
return;
}
const std::string& tags = G_ROCKETMQ_TAG;
const std::string& keys = G_ROCKETMQ_KEY;
//const std::string& tags = G_ROCKETMQ_TAG;
//const std::string& keys = G_ROCKETMQ_KEY;
try {
producer->sendMessage(body, topic, tags, keys);
@@ -306,7 +308,9 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
init = true;
}
std::string key = data.mp_id;
std::string key = data.key;
std::string tag = data.tag;
std::string senddata = data.strText;
if (data.strTopic == "HISDATA")
{
@@ -334,7 +338,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
}
rocketmq_producer_send(producer,senddata,topic);
rocketmq_producer_send(producer,senddata,topic,tag,key);
}
/////////////////////////////////////////////////////////////////////////////////////////////////查找台账下标
@@ -1247,7 +1251,7 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json
////////////////////////////////////////////////////////////////////////////////////////////////////////////////终端连接消息
void connect_status_to_queue(const std::string& id, const std::string& datetime, int status)
void connect_status_to_queue(const std::string& id, const std::string& datetime, int status)//这个不使用,使用新的带有时间封装的
{
try {
// 构造 JSON
@@ -1260,6 +1264,8 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime,
queue_data_t data;
data.strTopic = G_CONNECT_TOPIC;
data.strText = jsonObject.dump(); // 转换为字符串
data.tag = G_CONNECT_TAG;
data.key = G_CONNECT_KEY;
//if (g_node_id == STAT_DATA_BASE_NODE_ID) {
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -1288,6 +1294,8 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = obj.dump(); // 序列化为 JSON 字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
// 加入发送队列(线程安全)
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -1311,6 +1319,8 @@ void send_heartbeat_to_queue(const std::string& status) {
queue_data_t connect_info;
connect_info.strTopic = Heart_Beat_Topic;
connect_info.strText = obj.dump(); // 紧凑格式 JSON
connect_info.tag = Heart_Beat_Tag;
connect_info.key = Heart_Beat_Key;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(connect_info);
@@ -1412,6 +1422,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) {
}
data.strText = modified_strText;
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
@@ -1449,6 +1461,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) {
}
data.strText = modified_strText;
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
@@ -1482,7 +1496,8 @@ void rocketmq_test_rt(Front* front)//用来测试实时数据
data.strText = std::string(buffer.str());
data.mp_id = "123123";
//my_rocketmq_send(data,front->m_producer);
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
@@ -1505,7 +1520,8 @@ void rocketmq_test_ud(Front* front)//用来测试台账更新
data.strText = std::string(buffer.str());
data.mp_id = "123123";
//my_rocketmq_send(data,front->m_producer);
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
@@ -1528,7 +1544,8 @@ void rocketmq_test_set(Front* front)//用来测试进程控制脚本
data.strText = std::string(buffer.str());
data.mp_id = "123123";
//my_rocketmq_send(data,front->m_producer);
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
@@ -1551,7 +1568,8 @@ void rocketmq_test_rc(Front* front)//用来测试补招
data.strText = std::string(buffer.str());
data.mp_id = "123123";
//my_rocketmq_send(data,front->m_producer);
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
@@ -1890,6 +1908,8 @@ void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type) {
queue_data_t connect_info;
connect_info.strTopic = Topic_Reply_Topic;
connect_info.strText = obj.dump(); // 序列化为字符串
connect_info.tag = Topic_Reply_Tag;
connect_info.key = Topic_Reply_Key;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
@@ -1981,6 +2001,8 @@ void rocketmq_test_getdir(Front* front)//用来测试目录获取
data.strText = std::string(buffer.str());
data.mp_id = "123123";
data.tag = G_ROCKETMQ_TAG_TEST;
data.key = G_ROCKETMQ_KEY_TEST;
std::lock_guard<std::mutex> lock(queue_data_list_mutex);
queue_data_list.push_back(data);
}
@@ -2008,6 +2030,8 @@ void connect_status_update(const std::string& id, int status)
queue_data_t connect_info;
connect_info.strTopic = G_CONNECT_TOPIC;
connect_info.strText = j.dump(); // 转成字符串
connect_info.tag = G_CONNECT_TAG;
connect_info.key = G_CONNECT_KEY;
{
std::lock_guard<std::mutex> lock(queue_data_list_mutex);

View File

@@ -60,6 +60,10 @@ extern std::string TOPIC_RTDATA;
extern std::string G_ROCKETMQ_TAG;
extern std::string G_ROCKETMQ_KEY;
//添加rt的tagkey
extern std::string G_RT_TAG;
extern std::string G_RT_KEY;
extern std::string G_ROCKETMQ_CONSUMER;
extern std::string G_MQCONSUMER_IPPORT;
extern std::string G_MQCONSUMER_ACCESSKEY;