diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index f89354d..377e454 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -125,6 +125,10 @@ std::string TOPIC_RTDATA = ""; std::string G_ROCKETMQ_TAG = "";//tag std::string G_ROCKETMQ_KEY = "";//key +//实时数据tagkey +std::string G_RT_TAG = "";//tag +std::string G_RT_KEY = "";//key + //生产者 std::string G_ROCKETMQ_PRODUCER = ""; //rocketmq producer std::string G_MQPRODUCER_IPPORT = ""; //rocketmq ip+port @@ -272,6 +276,10 @@ void loadConfig(const std::string& filename) { strMap["Queue.QUEUE_TAG"] = &G_ROCKETMQ_TAG; strMap["Queue.QUEUE_KEY"] = &G_ROCKETMQ_KEY; + //添加rt的tagkey + strMap["Queue.RT_TAG"] = &G_RT_TAG; + strMap["Queue.RT_KEY"] = &G_RT_KEY; + // [RocketMq] —— 生产者 strMap["RocketMq.producer"] = &G_ROCKETMQ_PRODUCER; strMap["RocketMq.Ipport"] = &G_MQPRODUCER_IPPORT; @@ -2922,6 +2930,8 @@ void upload_data_test(){ data.strTopic = TOPIC_ALARM; data.strText = js; data.mp_id = "test"; + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } @@ -3314,6 +3324,8 @@ bool send_file_list(const std::string &dev_id, const std::vector & queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = j.dump(); // 序列化为字符串 + connect_info.tag = Topic_Reply_Tag; + connect_info.key = Topic_Reply_Key; { std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(std::move(connect_info)); @@ -3707,6 +3719,8 @@ bool send_set_value_reply(const std::string &dev_id, unsigned char mp_index, con queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = j.dump(); // 序列化为字符串 + connect_info.tag = Topic_Reply_Tag; + connect_info.key = Topic_Reply_Key; { std::lock_guard lock(queue_data_list_mutex); @@ -3903,6 +3917,8 @@ bool send_internal_value_reply(const std::string &dev_id, const std::vector lock(queue_data_list_mutex); @@ -3968,6 +3984,8 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = jsonString; + connect_info.tag = Topic_Reply_Tag; + connect_info.key = Topic_Reply_Key; // 加入发送队列(带互斥锁保护) queue_data_list_mutex.lock(); diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index ae8b485..6ac301a 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -265,6 +265,8 @@ public: std::string strTopic; //发送topic std::string strText; //发送的json字符串 std::string mp_id; //监测点id + std::string tag; //消息tag + std::string key; // 消息key }; ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index b222dac..9184ee0 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -163,6 +163,8 @@ protected: queue_data_t connect_info; connect_info.strTopic = G_LOG_TOPIC; connect_info.strText = jsonString; + connect_info.tag = G_LOG_TAG; + connect_info.key = G_LOG_KEY; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(connect_info); diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 9f63efb..870c355 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -266,14 +266,16 @@ void ShutdownAndDestroyProducer() // 使用 C++ 接口封装的 RocketMQProducer 类 void rocketmq_producer_send(rocketmq::RocketMQProducer* producer, const std::string& body, - const std::string& topic) { + const std::string& topic, + const std::string& tags, + const std::string& keys) { if (!producer) { std::cerr << "[rocketmq_producer_send] producer 不可用,未初始化\n"; return; } - const std::string& tags = G_ROCKETMQ_TAG; - const std::string& keys = G_ROCKETMQ_KEY; + //const std::string& tags = G_ROCKETMQ_TAG; + //const std::string& keys = G_ROCKETMQ_KEY; try { producer->sendMessage(body, topic, tags, keys); @@ -306,7 +308,9 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) init = true; } - std::string key = data.mp_id; + std::string key = data.key; + std::string tag = data.tag; + std::string senddata = data.strText; if (data.strTopic == "HISDATA") { @@ -334,7 +338,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) } - rocketmq_producer_send(producer,senddata,topic); + rocketmq_producer_send(producer,senddata,topic,tag,key); } /////////////////////////////////////////////////////////////////////////////////////////////////查找台账下标 @@ -1247,7 +1251,7 @@ std::string prepare_update(const std::string& code_str, const terminal_dev& json ////////////////////////////////////////////////////////////////////////////////////////////////////////////////终端连接消息 -void connect_status_to_queue(const std::string& id, const std::string& datetime, int status) +void connect_status_to_queue(const std::string& id, const std::string& datetime, int status)//这个不使用,使用新的带有时间封装的 { try { // 构造 JSON @@ -1260,6 +1264,8 @@ void connect_status_to_queue(const std::string& id, const std::string& datetime, queue_data_t data; data.strTopic = G_CONNECT_TOPIC; data.strText = jsonObject.dump(); // 转换为字符串 + data.tag = G_CONNECT_TAG; + data.key = G_CONNECT_KEY; //if (g_node_id == STAT_DATA_BASE_NODE_ID) { std::lock_guard lock(queue_data_list_mutex); @@ -1288,6 +1294,8 @@ void send_reply_to_queue(const std::string& guid, const std::string& step, const queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = obj.dump(); // 序列化为 JSON 字符串 + connect_info.tag = Topic_Reply_Tag; + connect_info.key = Topic_Reply_Key; // 加入发送队列(线程安全) std::lock_guard lock(queue_data_list_mutex); @@ -1311,6 +1319,8 @@ void send_heartbeat_to_queue(const std::string& status) { queue_data_t connect_info; connect_info.strTopic = Heart_Beat_Topic; connect_info.strText = obj.dump(); // 紧凑格式 JSON + connect_info.tag = Heart_Beat_Tag; + connect_info.key = Heart_Beat_Key; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(connect_info); @@ -1412,6 +1422,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { } data.strText = modified_strText; + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); @@ -1449,6 +1461,8 @@ void rocketmq_test_300(int mpnum, int front_index, int type, Front* front) { } data.strText = modified_strText; + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); @@ -1482,7 +1496,8 @@ void rocketmq_test_rt(Front* front)//用来测试实时数据 data.strText = std::string(buffer.str()); data.mp_id = "123123"; - //my_rocketmq_send(data,front->m_producer); + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } @@ -1505,7 +1520,8 @@ void rocketmq_test_ud(Front* front)//用来测试台账更新 data.strText = std::string(buffer.str()); data.mp_id = "123123"; - //my_rocketmq_send(data,front->m_producer); + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } @@ -1528,7 +1544,8 @@ void rocketmq_test_set(Front* front)//用来测试进程控制脚本 data.strText = std::string(buffer.str()); data.mp_id = "123123"; - //my_rocketmq_send(data,front->m_producer); + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } @@ -1551,7 +1568,8 @@ void rocketmq_test_rc(Front* front)//用来测试补招 data.strText = std::string(buffer.str()); data.mp_id = "123123"; - //my_rocketmq_send(data,front->m_producer); + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } @@ -1890,6 +1908,8 @@ void send_reply_to_cloud(int reply_code, const std::string& dev_id, int type) { queue_data_t connect_info; connect_info.strTopic = Topic_Reply_Topic; connect_info.strText = obj.dump(); // 序列化为字符串 + connect_info.tag = Topic_Reply_Tag; + connect_info.key = Topic_Reply_Key; { std::lock_guard lock(queue_data_list_mutex); @@ -1981,6 +2001,8 @@ void rocketmq_test_getdir(Front* front)//用来测试目录获取 data.strText = std::string(buffer.str()); data.mp_id = "123123"; + data.tag = G_ROCKETMQ_TAG_TEST; + data.key = G_ROCKETMQ_KEY_TEST; std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); } @@ -2008,6 +2030,8 @@ void connect_status_update(const std::string& id, int status) queue_data_t connect_info; connect_info.strTopic = G_CONNECT_TOPIC; connect_info.strText = j.dump(); // 转成字符串 + connect_info.tag = G_CONNECT_TAG; + connect_info.key = G_CONNECT_KEY; { std::lock_guard lock(queue_data_list_mutex); diff --git a/LFtid1056/cloudfront/code/rocketmq.h b/LFtid1056/cloudfront/code/rocketmq.h index 4b4e493..71e9f1c 100644 --- a/LFtid1056/cloudfront/code/rocketmq.h +++ b/LFtid1056/cloudfront/code/rocketmq.h @@ -60,6 +60,10 @@ extern std::string TOPIC_RTDATA; extern std::string G_ROCKETMQ_TAG; extern std::string G_ROCKETMQ_KEY; +//添加rt的tagkey +extern std::string G_RT_TAG; +extern std::string G_RT_KEY; + extern std::string G_ROCKETMQ_CONSUMER; extern std::string G_MQCONSUMER_IPPORT; extern std::string G_MQCONSUMER_ACCESSKEY; diff --git a/LFtid1056/config/front.cfg b/LFtid1056/config/front.cfg index 0777f28..7636207 100644 --- a/LFtid1056/config/front.cfg +++ b/LFtid1056/config/front.cfg @@ -11,6 +11,9 @@ SngTopic=SngTopic QUEUE_TAG=stat QUEUE_KEY=stat +RT_TAG=rt +RT_KEY=rt + [Flag] FrontInst= FrontIP=192.168.1.138 diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 269c0c9..4326d5e 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -441,10 +441,12 @@ void process_received_message(string mac, string id,const char* data, size_t len //} queue_data_t data; - data.monitor_no = avg_data.name; // + data.monitor_no = avg_data.name; // data.strTopic = TOPIC_STAT;//ͳtopic data.strText = js; - data.mp_id = "test"; // + data.mp_id = ""; //idʱҪ + data.tag = G_ROCKETMQ_TAG; //ͳtag + data.key = G_ROCKETMQ_KEY; //ͳkey std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data); @@ -614,10 +616,12 @@ void process_received_message(string mac, string id,const char* data, size_t len ); //std::cout << js << std::en queue_data_t data; - data.monitor_no = 1; // + data.monitor_no = 1; //͵ʵʱûвţͳһ1 data.strTopic = TOPIC_RTDATA; //ʵʱtopic data.strText = js; - data.mp_id = "test"; // + data.mp_id = ""; //idʱҪ + data.tag = G_RT_TAG; //ʵʱtag + data.key = G_RT_KEY; //ʵʱkey std::lock_guard lock(queue_data_list_mutex); queue_data_list.push_back(data);