Compare commits
8 Commits
armtest1.0
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
| c388bd04fe | |||
| c6ca57a204 | |||
| 2f584fda30 | |||
| 2f2e0d6430 | |||
| fc861024c3 | |||
| 3c98bf7eae | |||
| 073c98e89a | |||
| b1d8440e6a |
@@ -29,10 +29,14 @@
|
|||||||
#include "../rocketmq/DefaultMQPushConsumer.h"
|
#include "../rocketmq/DefaultMQPushConsumer.h"
|
||||||
#include "../rocketmq/ConsumeType.h"
|
#include "../rocketmq/ConsumeType.h"
|
||||||
|
|
||||||
|
#include "../rocketmq/MQMessageListener.h"
|
||||||
|
#include "../rocketmq/MQMessageExt.h"
|
||||||
|
#include "../rocketmq/SessionCredentials.h"
|
||||||
|
|
||||||
// 引入提供的消费者接口头文件
|
// 引入提供的消费者接口头文件
|
||||||
#include "../rocketmq/CPushConsumer.h"
|
//#include "../rocketmq/CPushConsumer.h"
|
||||||
#include "../rocketmq/CCommon.h"
|
//#include "../rocketmq/CCommon.h"
|
||||||
#include "../rocketmq/CMessageExt.h"
|
//#include "../rocketmq/CMessageExt.h"
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex)
|
#include <pthread.h> // 用于互斥锁(在 C++98 中没有 std::mutex)
|
||||||
#include <utility> // for std::pair
|
#include <utility> // for std::pair
|
||||||
@@ -41,11 +45,15 @@
|
|||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
static std::once_flag g_consumer_once;
|
||||||
|
static std::once_flag g_producer_once;
|
||||||
|
|
||||||
extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer
|
extern std::string G_ROCKETMQ_PRODUCER;//rocketmq producer
|
||||||
extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port
|
extern std::string G_ROCKETMQ_IPPORT;//rocketmq ip+port
|
||||||
extern std::string G_ROCKETMQ_TOPIC;//topie
|
|
||||||
extern std::string G_ROCKETMQ_TAG;//tag
|
extern std::string G_ROCKETMQ_TOPIC_TEST;//topie
|
||||||
extern std::string G_ROCKETMQ_KEY;//key
|
extern std::string G_ROCKETMQ_TAG_TEST;//tag
|
||||||
|
extern std::string G_ROCKETMQ_KEY_TEST;//key
|
||||||
|
|
||||||
extern std::string G_MQCONSUMER_TOPIC_LOG;
|
extern std::string G_MQCONSUMER_TOPIC_LOG;
|
||||||
extern std::string G_MQCONSUMER_TOPIC_SET;
|
extern std::string G_MQCONSUMER_TOPIC_SET;
|
||||||
@@ -53,6 +61,8 @@ extern std::string G_MQCONSUMER_TOPIC_RC;
|
|||||||
extern std::string G_MQCONSUMER_TOPIC_UD;
|
extern std::string G_MQCONSUMER_TOPIC_UD;
|
||||||
extern std::string G_MQCONSUMER_TOPIC_RT;
|
extern std::string G_MQCONSUMER_TOPIC_RT;
|
||||||
|
|
||||||
|
extern std::string G_MQCONSUMER_TOPIC_TEST;
|
||||||
|
|
||||||
extern std::string FRONT_INST;
|
extern std::string FRONT_INST;
|
||||||
extern bool DEBUGOPEN;
|
extern bool DEBUGOPEN;
|
||||||
|
|
||||||
@@ -79,20 +89,40 @@ extern std::string G_MQCONSUMER_CHANNEL;
|
|||||||
class RocketMQConsumer;
|
class RocketMQConsumer;
|
||||||
|
|
||||||
// 全局映射:CPushConsumer* -> RocketMQConsumer*
|
// 全局映射:CPushConsumer* -> RocketMQConsumer*
|
||||||
std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
|
//std::map<CPushConsumer*, RocketMQConsumer*> g_consumerMap;//
|
||||||
pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
|
//pthread_mutex_t g_consumerMapMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
////////////////////////////////
|
||||||
|
int64_t G_APP_START_MS = []() -> int64_t {
|
||||||
|
using namespace std::chrono;
|
||||||
|
return duration_cast<milliseconds>(
|
||||||
|
system_clock::now().time_since_epoch()
|
||||||
|
).count();
|
||||||
|
}();
|
||||||
|
|
||||||
|
int64_t G_START_SKEW_MS = 1000;
|
||||||
|
|
||||||
|
bool should_process_after_start(const rocketmq::MQMessageExt& msg)
|
||||||
|
{
|
||||||
|
const int64_t born_ts = static_cast<int64_t>(msg.getBornTimestamp());
|
||||||
|
return born_ts >= (G_APP_START_MS - G_START_SKEW_MS);
|
||||||
|
}
|
||||||
|
///////////////////////////////
|
||||||
|
class InternalListener;
|
||||||
|
|
||||||
class RocketMQConsumer {
|
class RocketMQConsumer {
|
||||||
public:
|
public:
|
||||||
// 构造函数:初始化消费者并启动
|
// 构造函数:初始化消费者并启动
|
||||||
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId);
|
RocketMQConsumer(const std::string& consumerName, const std::string& nameServer);
|
||||||
|
|
||||||
// 禁用拷贝和赋值
|
// 禁用拷贝和赋值
|
||||||
RocketMQConsumer(const RocketMQConsumer&) {}
|
RocketMQConsumer(const RocketMQConsumer&) = delete;
|
||||||
RocketMQConsumer& operator=(const RocketMQConsumer&) { return *this; }
|
RocketMQConsumer& operator=(const RocketMQConsumer&) = delete;
|
||||||
|
|
||||||
// 订阅主题和标签,并注册回调
|
// 订阅主题和标签,并注册回调
|
||||||
void subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback);
|
void subscribe(const std::string& topic,
|
||||||
|
const std::string& tag,
|
||||||
|
MessageCallBack callback);
|
||||||
|
|
||||||
// 启动消费者
|
// 启动消费者
|
||||||
void start();
|
void start();
|
||||||
@@ -100,23 +130,47 @@ public:
|
|||||||
//修改消费模式
|
//修改消费模式
|
||||||
void setConsumerMessageModel(const std::string& topic);
|
void setConsumerMessageModel(const std::string& topic);
|
||||||
|
|
||||||
|
rocketmq::ConsumeStatus handleMessage(const rocketmq::MQMessageExt& msg);
|
||||||
|
|
||||||
// 析构函数:关闭并销毁消费者
|
// 析构函数:关闭并销毁消费者
|
||||||
~RocketMQConsumer();
|
~RocketMQConsumer();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
CPushConsumer* consumer_; // C 接口消费者指针
|
//CPushConsumer* consumer_; // C 接口消费者指针
|
||||||
//MessageCallBack messageCallback_; // 函数指针用于回调
|
//MessageCallBack messageCallback_; // 函数指针用于回调
|
||||||
|
rocketmq::DefaultMQPushConsumer consumer_;
|
||||||
|
InternalListener* listener_;
|
||||||
|
|
||||||
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
|
std::map<std::pair<std::string, std::string>, MessageCallBack> callbacks_; // 订阅到回调的映射
|
||||||
|
|
||||||
// 静态消息处理回调
|
/*// 静态消息处理回调
|
||||||
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
|
static int messageHandler(CPushConsumer* consumer, CMessageExt* msg);
|
||||||
|
|
||||||
// 实例消息处理函数
|
// 实例消息处理函数
|
||||||
int handleMessage(CMessageExt* msg);
|
int handleMessage(CMessageExt* msg);*/
|
||||||
};
|
|
||||||
|
|
||||||
// 构造函数实现
|
};
|
||||||
RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
|
class InternalListener : public rocketmq::MessageListenerConcurrently {
|
||||||
|
public:
|
||||||
|
explicit InternalListener(RocketMQConsumer* owner)
|
||||||
|
: owner_(owner) {}
|
||||||
|
|
||||||
|
rocketmq::ConsumeStatus consumeMessage(
|
||||||
|
const std::vector<rocketmq::MQMessageExt>& msgs) override
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < msgs.size(); ++i) {
|
||||||
|
rocketmq::ConsumeStatus ret = owner_->handleMessage(msgs[i]);
|
||||||
|
if (ret != rocketmq::CONSUME_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rocketmq::CONSUME_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
RocketMQConsumer* owner_;
|
||||||
|
};
|
||||||
|
// 构造函数实现C
|
||||||
|
/*RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::string& nameServer, const std::string& groupId)
|
||||||
: consumer_(NULL)//, messageCallback_(NULL)
|
: consumer_(NULL)//, messageCallback_(NULL)
|
||||||
{
|
{
|
||||||
// 创建消费者
|
// 创建消费者
|
||||||
@@ -163,35 +217,58 @@ RocketMQConsumer::RocketMQConsumer(const std::string& consumerName, const std::s
|
|||||||
pthread_mutex_unlock(&g_consumerMapMutex);
|
pthread_mutex_unlock(&g_consumerMapMutex);
|
||||||
|
|
||||||
std::cout << "RocketMQ Consumer initialized and started." << std::endl;
|
std::cout << "RocketMQ Consumer initialized and started." << std::endl;
|
||||||
|
}*/
|
||||||
|
|
||||||
|
//构造函数实现C++
|
||||||
|
RocketMQConsumer::RocketMQConsumer(const std::string& consumerGroup,
|
||||||
|
const std::string& nameServer)
|
||||||
|
: consumer_(consumerGroup),
|
||||||
|
listener_(NULL)
|
||||||
|
{
|
||||||
|
consumer_.setNamesrvAddr(nameServer);
|
||||||
|
|
||||||
|
consumer_.setSessionCredentials(
|
||||||
|
G_MQCONSUMER_ACCESSKEY,
|
||||||
|
G_MQCONSUMER_SECRETKEY,
|
||||||
|
G_MQCONSUMER_CHANNEL
|
||||||
|
);
|
||||||
|
|
||||||
|
// 限制消费线程池,防止 ConsumeTP 爆炸
|
||||||
|
consumer_.setConsumeThreadCount(4);
|
||||||
|
|
||||||
|
listener_ = new InternalListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 启动消费者
|
// 启动消费者
|
||||||
void RocketMQConsumer::start()
|
void RocketMQConsumer::start()
|
||||||
{
|
{
|
||||||
if (StartPushConsumer(consumer_) != 0) {
|
static bool started = false;
|
||||||
pthread_mutex_lock(&g_consumerMapMutex);
|
if (started) {
|
||||||
g_consumerMap.erase(consumer_);
|
std::cout << "Consumer already started" << std::endl;
|
||||||
pthread_mutex_unlock(&g_consumerMapMutex);
|
return;
|
||||||
DestroyPushConsumer(consumer_);
|
|
||||||
throw std::runtime_error("Failed to start push consumer.");
|
|
||||||
}
|
|
||||||
else{
|
|
||||||
std::cout << "RocketMQ Consumer started." << std::endl;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumer_.registerMessageListener(listener_);
|
||||||
|
consumer_.start();
|
||||||
|
|
||||||
|
started = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
|
void RocketMQConsumer::subscribe(const std::string& topic, const std::string& tag, MessageCallBack callback)
|
||||||
{
|
{
|
||||||
if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
|
/*if (Subscribe(consumer_, topic.c_str(), tag.c_str()) != 0) {
|
||||||
throw std::runtime_error("Failed to subscribe to topic/tag.");
|
throw std::runtime_error("Failed to subscribe to topic/tag.");
|
||||||
}
|
}*/
|
||||||
|
consumer_.subscribe(topic, tag);
|
||||||
|
|
||||||
|
//调试用
|
||||||
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
|
std::cout << "Subscribed to topic: " << topic << ", tag: " << tag << std::endl;
|
||||||
|
|
||||||
// 使用 std::pair 作为键
|
// 使用 std::pair 作为键
|
||||||
std::pair<std::string, std::string> key(topic, tag);
|
std::pair<std::string, std::string> mapKey(topic, tag);
|
||||||
callbacks_[key] = callback;
|
callbacks_[mapKey] = callback;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
// 静态消息处理回调实现
|
// 静态消息处理回调实现
|
||||||
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
|
int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
|
||||||
{
|
{
|
||||||
@@ -212,60 +289,58 @@ int RocketMQConsumer::messageHandler(CPushConsumer* consumer, CMessageExt* msg)
|
|||||||
return instance->handleMessage(msg);
|
return instance->handleMessage(msg);
|
||||||
} else {
|
} else {
|
||||||
std::cerr << "Consumer instance not found for callback." << std::endl;
|
std::cerr << "Consumer instance not found for callback." << std::endl;
|
||||||
return E_RECONSUME_LATER; // 默认返回重试状态
|
//return E_RECONSUME_LATER; // 默认返回重试状态
|
||||||
|
return rocketmq::RECONSUME_LATER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
int RocketMQConsumer::handleMessage(CMessageExt* msg)
|
/*int RocketMQConsumer::handleMessage(CMessageExt* msg)
|
||||||
{
|
{
|
||||||
// 检查 msg 和 consumer_ 是否为 NULL
|
// 检查 msg 和 consumer_ 是否为 NULL
|
||||||
if (!msg || !consumer_) {
|
if (!msg || !consumer_) {
|
||||||
std::cerr << "Received null message or consumer." << std::endl;
|
std::cerr << "Received null message or consumer." << std::endl;
|
||||||
return E_RECONSUME_LATER;
|
//return E_RECONSUME_LATER;
|
||||||
}
|
return rocketmq::RECONSUME_LATER;
|
||||||
|
}*/
|
||||||
|
|
||||||
// 获取消息的主题和标签
|
// 获取消息的主题和标签
|
||||||
std::string topic = GetMessageTopic(msg); // 假设存在此函数
|
//std::string topic = GetMessageTopic(msg); // 假设存在此函数
|
||||||
std::string tag = GetMessageTags(msg); // 假设存在此函数
|
//std::string tag = GetMessageTags(msg); // 假设存在此函数
|
||||||
|
|
||||||
|
rocketmq::ConsumeStatus RocketMQConsumer::handleMessage( const rocketmq::MQMessageExt& msg) {
|
||||||
|
std::string tag = msg.getTags();
|
||||||
|
std::string topic = msg.getTopic();
|
||||||
// 打印调试信息
|
// 打印调试信息
|
||||||
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
|
std::cout << "Handling message for topic: " << topic << ", tag: " << tag << std::endl;
|
||||||
|
|
||||||
// 使用 std::pair 作为键
|
// 使用 std::pair 作为键
|
||||||
std::pair<std::string, std::string> key(topic, tag);
|
std::pair<std::string, std::string> key(topic, tag);
|
||||||
|
|
||||||
// 查找对应的回调函数
|
// 查找对应的回调函数
|
||||||
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
|
std::map<std::pair<std::string, std::string>, MessageCallBack>::iterator it = callbacks_.find(key);
|
||||||
if (it != callbacks_.end()) {
|
if (it != callbacks_.end())
|
||||||
// 调用对应的回调函数
|
{ // 调用对应的回调函数
|
||||||
|
|
||||||
//调试
|
//调试
|
||||||
std::cout << "callback Handling message " <<std::endl;
|
std::cout << "callback Handling message " <<std::endl;
|
||||||
|
//return it->second(consumer_, msg);
|
||||||
return it->second(consumer_, msg);
|
return it->second(msg);
|
||||||
|
}
|
||||||
} else {
|
else {
|
||||||
|
|
||||||
//调试
|
//调试
|
||||||
std::cout << "there is no callback " <<std::endl;
|
std::cout << "there is no callback " <<std::endl;
|
||||||
|
|
||||||
// 如果没有找到对应的回调,执行默认处理
|
// 如果没有找到对应的回调,执行默认处理
|
||||||
const char* body = GetMessageBody(msg);
|
//const char* body = GetMessageBody(msg);
|
||||||
const char* msgKey = GetMessageKeys(msg);
|
//const char* msgKey = GetMessageKeys(msg);
|
||||||
|
std::string body = msg.getBody();
|
||||||
if (body) {
|
std::string msgKey = msg.getKeys();
|
||||||
|
if (!body.empty()) {
|
||||||
std::cout << "Received message body: " << body << std::endl;
|
std::cout << "Received message body: " << body << std::endl;
|
||||||
} else {
|
} else {
|
||||||
std::cout << "Received message with empty body." << std::endl;
|
std::cout << "Received message with empty body." << std::endl;
|
||||||
}
|
} if (!msgKey.empty()) {
|
||||||
|
|
||||||
if (msgKey) {
|
|
||||||
std::cout << "Message Key: " << msgKey << std::endl;
|
std::cout << "Message Key: " << msgKey << std::endl;
|
||||||
} else {
|
} else {
|
||||||
std::cout << "Message Key: N/A" << std::endl;
|
std::cout << "Message Key: N/A" << std::endl;
|
||||||
}
|
} //return E_CONSUME_SUCCESS;
|
||||||
|
return rocketmq::CONSUME_SUCCESS;
|
||||||
return E_CONSUME_SUCCESS;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -273,7 +348,7 @@ int RocketMQConsumer::handleMessage(CMessageExt* msg)
|
|||||||
// 析构函数实现
|
// 析构函数实现
|
||||||
RocketMQConsumer::~RocketMQConsumer()
|
RocketMQConsumer::~RocketMQConsumer()
|
||||||
{
|
{
|
||||||
if (consumer_) {
|
/*if (consumer_) {
|
||||||
// 关闭消费者
|
// 关闭消费者
|
||||||
ShutdownPushConsumer(consumer_);
|
ShutdownPushConsumer(consumer_);
|
||||||
|
|
||||||
@@ -287,7 +362,18 @@ RocketMQConsumer::~RocketMQConsumer()
|
|||||||
consumer_ = NULL;
|
consumer_ = NULL;
|
||||||
|
|
||||||
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
|
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
|
||||||
|
}*/
|
||||||
|
try {
|
||||||
|
consumer_.shutdown();
|
||||||
|
} catch (...) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sleep(1); // 等内部线程退出
|
||||||
|
|
||||||
|
delete listener_;
|
||||||
|
listener_ = NULL;
|
||||||
|
|
||||||
|
std::cout << "RocketMQ Consumer shutdown and destroyed." << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 在 RocketMQConsumer 类中新增函数用来设置消费模式
|
// 在 RocketMQConsumer 类中新增函数用来设置消费模式
|
||||||
@@ -302,11 +388,13 @@ void RocketMQConsumer::setConsumerMessageModel(const std::string& topic)
|
|||||||
}
|
}
|
||||||
} else*/ {
|
} else*/ {
|
||||||
// 默认设置为广播消费模式
|
// 默认设置为广播消费模式
|
||||||
if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
|
/*if (SetPushConsumerMessageModel(consumer_, BROADCASTING) != 0) {
|
||||||
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
|
std::cout << "Error setting message model to BROADCASTING for topic: " << topic << std::endl;
|
||||||
} else {
|
} else {
|
||||||
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
|
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
|
||||||
}
|
}*/
|
||||||
|
consumer_.setMessageModel(rocketmq::BROADCASTING);
|
||||||
|
std::cout << "Set consumer to BROADCASTING for topic: " << topic << std::endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -322,7 +410,8 @@ void InitializeConsumer(
|
|||||||
if (g_consumer == NULL) {
|
if (g_consumer == NULL) {
|
||||||
std::cout << "create new consumer!" << std::endl;
|
std::cout << "create new consumer!" << std::endl;
|
||||||
try {
|
try {
|
||||||
g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组,不同进程(不同的消费者)同时消费topic的同一条消息
|
//g_consumer = new RocketMQConsumer(consumerName, nameServer,consumerName);//用消费名作为消费组,不同进程(不同的消费者)同时消费topic的同一条消息
|
||||||
|
g_consumer = new RocketMQConsumer(consumerName, nameServer);
|
||||||
|
|
||||||
for (size_t i = 0; i < subscriptions.size(); ++i) {
|
for (size_t i = 0; i < subscriptions.size(); ++i) {
|
||||||
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
|
g_consumer->setConsumerMessageModel(subscriptions[i].topic);//初始化时根据topic设置消费模式
|
||||||
@@ -354,16 +443,13 @@ void rocketmq_consumer_receive(
|
|||||||
const std::string& nameServer,
|
const std::string& nameServer,
|
||||||
const std::vector<Subscription>& subscriptions) // 接收多个订阅
|
const std::vector<Subscription>& subscriptions) // 接收多个订阅
|
||||||
{
|
{
|
||||||
if (g_consumer == NULL) {
|
std::call_once(g_consumer_once, [&](){
|
||||||
try {
|
try {
|
||||||
//InitializeConsumer(consumerName, nameServer, topic, tag, callback);//初始化后,mq库内部来完成消息的获取
|
InitializeConsumer(consumerName, nameServer, subscriptions);
|
||||||
InitializeConsumer(consumerName, nameServer, subscriptions); // 初始化后,MQ库内部开始获取消息
|
} catch (...) {
|
||||||
}
|
|
||||||
catch (...) {
|
|
||||||
std::cerr << "Cannot consume message because consumer initialization failed." << std::endl;
|
std::cerr << "Cannot consume message because consumer initialization failed." << std::endl;
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
@@ -407,24 +493,36 @@ int RoundRobinSelector(int queueNum, CMessage* msg, void* arg) {
|
|||||||
class RocketMQProducer {
|
class RocketMQProducer {
|
||||||
public:
|
public:
|
||||||
RocketMQProducer(const std::string& producerName, const std::string& nameServer)
|
RocketMQProducer(const std::string& producerName, const std::string& nameServer)
|
||||||
: producer_(NULL)
|
: producer_(producerName)
|
||||||
{
|
{
|
||||||
// 创建生产者
|
// 创建生产者
|
||||||
producer_ = CreateProducer(producerName.c_str());
|
/*producer_ = CreateProducer(producerName.c_str());
|
||||||
if (producer_ == NULL) {
|
if (producer_ == NULL) {
|
||||||
throw std::runtime_error("Failed to create producer.");
|
throw std::runtime_error("Failed to create producer.");
|
||||||
}
|
}*/
|
||||||
|
|
||||||
|
// 设置日志
|
||||||
|
producer_.setLogLevel(rocketmq::eLOG_LEVEL_ERROR);
|
||||||
|
producer_.setLogFileSizeAndNum(5, 50);
|
||||||
|
|
||||||
// 设置 nameserver 地址
|
// 设置 nameserver 地址
|
||||||
SetProducerNameServerAddress(producer_, nameServer.c_str());
|
//SetProducerNameServerAddress(producer_, nameServer.c_str());
|
||||||
|
producer_.setNamesrvAddr(nameServer);
|
||||||
|
|
||||||
//lnk20260417设置数据上送消息体最大值,默认4M,调整为1M,避免过大消息导致发送失败
|
//lnk20260417设置数据上送消息体最大值,默认4M,调整为1M,避免过大消息导致发送失败
|
||||||
SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
|
//SetProducerMaxMessageSize(producer_, 1024 * 1024); // 1MB
|
||||||
|
producer_.setMaxMessageSize(1024 * 1024);
|
||||||
|
|
||||||
SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
|
//SetProducerSessionCredentials(producer_, G_MQCONSUMER_ACCESSKEY.c_str(),G_MQCONSUMER_SECRETKEY.c_str(), "");
|
||||||
|
producer_.setSessionCredentials(
|
||||||
|
G_MQCONSUMER_ACCESSKEY,
|
||||||
|
G_MQCONSUMER_SECRETKEY,
|
||||||
|
""
|
||||||
|
);
|
||||||
|
|
||||||
// 启动生产者
|
// 启动生产者
|
||||||
StartProducer(producer_);
|
//StartProducer(producer_);
|
||||||
|
producer_.start();
|
||||||
|
|
||||||
std::cout << "rocketmq_Producer initialized and started." << std::endl;
|
std::cout << "rocketmq_Producer initialized and started." << std::endl;
|
||||||
}
|
}
|
||||||
@@ -433,7 +531,7 @@ public:
|
|||||||
RocketMQProducer(const RocketMQProducer&) = delete;
|
RocketMQProducer(const RocketMQProducer&) = delete;
|
||||||
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
|
RocketMQProducer& operator=(const RocketMQProducer&) = delete;
|
||||||
|
|
||||||
void printSendResult(const CSendResult& result) {
|
/*void printSendResult(const CSendResult& result) {
|
||||||
std::cout << "SendResult:" << std::endl;
|
std::cout << "SendResult:" << std::endl;
|
||||||
std::cout << " Status: ";
|
std::cout << " Status: ";
|
||||||
switch (result.sendStatus) {
|
switch (result.sendStatus) {
|
||||||
@@ -457,10 +555,10 @@ public:
|
|||||||
|
|
||||||
std::cout << " MsgID : " << result.msgId << std::endl;
|
std::cout << " MsgID : " << result.msgId << std::endl;
|
||||||
std::cout << " Offset: " << result.offset << std::endl;
|
std::cout << " Offset: " << result.offset << std::endl;
|
||||||
}
|
}*/
|
||||||
|
|
||||||
// 发送消息
|
// 发送消息
|
||||||
void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
|
/* void sendMessage(const char* strbody, const char* topic, const std::string& tags, const std::string& keys) {
|
||||||
|
|
||||||
if (DEBUGOPEN) {
|
if (DEBUGOPEN) {
|
||||||
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
|
std::cout << "sendMessage called with topic: " << (topic ? topic : "NULL")
|
||||||
@@ -585,7 +683,7 @@ public:
|
|||||||
<< std::endl;*/
|
<< std::endl;*/
|
||||||
// 发送消息:临时改成同步发送,绕过 orderly / selector,便于定位问题
|
// 发送消息:临时改成同步发送,绕过 orderly / selector,便于定位问题
|
||||||
|
|
||||||
if (sendResult == 0) { // 假设返回 0 表示成功
|
/*if (sendResult == 0) { // 假设返回 0 表示成功
|
||||||
std::cout << "[MQ][SEND_OK]"
|
std::cout << "[MQ][SEND_OK]"
|
||||||
<< " topic=" << (topic ? topic : "")
|
<< " topic=" << (topic ? topic : "")
|
||||||
<< ", tags=" << tags
|
<< ", tags=" << tags
|
||||||
@@ -630,20 +728,97 @@ public:
|
|||||||
DestroyMessage(msg);
|
DestroyMessage(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}*/
|
||||||
|
void sendMessage(const std::string& body,
|
||||||
|
const std::string& topic,
|
||||||
|
const std::string& tags,
|
||||||
|
const std::string& keys)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
if (DEBUGOPEN) {
|
||||||
|
std::cout << "sendMessage called with topic: " << topic
|
||||||
|
<< ", tags: " << tags
|
||||||
|
<< ", keys: " << keys
|
||||||
|
<< ", body_len=" << body.size()
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
size_t n = std::min((size_t)200, body.size());
|
||||||
|
|
||||||
|
std::cout << "[MQ][BODY_HEAD] "
|
||||||
|
<< body.substr(0, n)
|
||||||
|
<< std::endl;
|
||||||
|
|
||||||
|
if (body.size() > n) {
|
||||||
|
std::cout << "[MQ][BODY_TAIL] "
|
||||||
|
<< body.substr(body.size() - n, n)
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "[MQ][HEX_HEAD] ";
|
||||||
|
for (size_t i = 0; i < std::min((size_t)100, body.size()); ++i) {
|
||||||
|
printf("%02X ", (unsigned char)body[i]);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
rocketmq::MQMessage msg(topic, tags, keys, body);
|
||||||
|
|
||||||
|
rocketmq::SendResult result = producer_.send(msg);
|
||||||
|
|
||||||
|
std::cout << "[MQ][SEND_OK]"
|
||||||
|
<< " topic=" << topic
|
||||||
|
<< ", tags=" << tags
|
||||||
|
<< ", keys=" << keys
|
||||||
|
<< ", msgId=" << result.getMsgId()
|
||||||
|
<< ", status=" << result.getSendStatus()
|
||||||
|
<< ", body_len=" << body.size()
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
|
catch (const rocketmq::MQClientException& e) {
|
||||||
|
std::cerr << "[MQ][SEND_FAIL] MQClientException: "
|
||||||
|
<< e.what() << std::endl;
|
||||||
|
|
||||||
|
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
|
||||||
|
"【ERROR】前置的%s%d号进程 mq发送失败,mq客户端错误,请检查mq配置",
|
||||||
|
get_front_msg_from_subdir(), g_front_seg_index);
|
||||||
|
}
|
||||||
|
catch (const std::exception& e) {
|
||||||
|
std::cerr << "[MQ][SEND_FAIL] exception: "
|
||||||
|
<< e.what() << std::endl;
|
||||||
|
|
||||||
|
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
|
||||||
|
"【ERROR】前置的%s%d号进程 mq发送失败,mq发送错误,发送请检查mq配置",
|
||||||
|
get_front_msg_from_subdir(), g_front_seg_index);
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
std::cerr << "[MQ][SEND_FAIL] unknown exception" << std::endl;
|
||||||
|
|
||||||
|
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
|
||||||
|
"【ERROR】前置的%s%d号进程 mq发送失败,未知错误,请检查mq配置",
|
||||||
|
get_front_msg_from_subdir(), g_front_seg_index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// 析构函数中关闭并销毁生产者
|
// 析构函数中关闭并销毁生产者
|
||||||
~RocketMQProducer() {
|
~RocketMQProducer() {
|
||||||
if (producer_) {
|
/*if (producer_) {
|
||||||
ShutdownProducer(producer_);
|
ShutdownProducer(producer_);
|
||||||
DestroyProducer(producer_);
|
DestroyProducer(producer_);
|
||||||
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
|
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
|
||||||
|
}*/
|
||||||
|
try {
|
||||||
|
producer_.shutdown();
|
||||||
}
|
}
|
||||||
|
catch (...) {
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "rocketmq_Producer shutdown and destroyed." << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
CProducer* producer_;
|
//CProducer* producer_;
|
||||||
|
rocketmq::DefaultMQProducer producer_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// 全局生产者实例
|
// 全局生产者实例
|
||||||
@@ -654,7 +829,7 @@ void InitializeProducer()
|
|||||||
{
|
{
|
||||||
if (g_producer == NULL) {
|
if (g_producer == NULL) {
|
||||||
try {
|
try {
|
||||||
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);
|
g_producer = new RocketMQProducer(G_ROCKETMQ_PRODUCER, G_ROCKETMQ_IPPORT);//生产者名称和NameServer地址
|
||||||
}
|
}
|
||||||
catch (const std::exception& e) {
|
catch (const std::exception& e) {
|
||||||
std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
|
std::cerr << "Failed to initialize producer: " << e.what() << std::endl;
|
||||||
@@ -674,43 +849,37 @@ void ShutdownAndDestroyProducer()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 发送消息的接口函数
|
// 发送消息的接口函数
|
||||||
void rocketmq_producer_send(const char* strbody, const char* topic)
|
void rocketmq_producer_send(const std::string& body,
|
||||||
|
const std::string& topic,
|
||||||
|
const std::string& tags,
|
||||||
|
const std::string& keys)
|
||||||
{
|
{
|
||||||
if (g_producer == NULL) {
|
std::call_once(g_producer_once, [&](){
|
||||||
try {
|
|
||||||
InitializeProducer();
|
InitializeProducer();
|
||||||
}
|
});
|
||||||
catch (...) {
|
|
||||||
std::cerr << "Cannot send message because producer initialization failed." << std::endl;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 假设 tags 和 keys 是固定的,可以根据需要修改
|
|
||||||
std::string tags = G_ROCKETMQ_TAG;
|
|
||||||
std::string keys = G_ROCKETMQ_KEY;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
g_producer->sendMessage(strbody, topic, tags, keys);
|
g_producer->sendMessage(body, topic, tags, keys);
|
||||||
}
|
} catch (const std::exception& e) {
|
||||||
catch (const std::exception& e) {
|
|
||||||
std::cerr << "Failed to send message: " << e.what() << std::endl;
|
std::cerr << "Failed to send message: " << e.what() << std::endl;
|
||||||
// 处理发送失败的情况,例如记录日志或重试
|
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,
|
||||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_MQ,"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置", get_front_msg_from_subdir(), g_front_seg_index);
|
"【ERROR】前置的%s%d号进程 mq发送失败,请检查mq配置",
|
||||||
|
get_front_msg_from_subdir(), g_front_seg_index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/*
|
||||||
// producer_send0测试用
|
// producer_send0测试用
|
||||||
void StartSendMessage(CProducer* producer)
|
void StartSendMessage(CProducer* producer)
|
||||||
{
|
{
|
||||||
CSendResult result;
|
CSendResult result;
|
||||||
|
|
||||||
// create message and set some values for it
|
// create message and set some values for it
|
||||||
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str());
|
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
|
||||||
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str());
|
SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
|
||||||
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str());
|
SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++)
|
for (int i = 0; i < 10; i++)
|
||||||
{
|
{
|
||||||
@@ -734,9 +903,9 @@ void StartSendMessage(CProducer* producer,const char* strbody)
|
|||||||
CSendResult result;
|
CSendResult result;
|
||||||
|
|
||||||
// create message and set some values for it
|
// create message and set some values for it
|
||||||
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC.c_str());
|
CMessage* msg = CreateMessage(G_ROCKETMQ_TOPIC_TEST.c_str());
|
||||||
SetMessageTags(msg, G_ROCKETMQ_TAG.c_str());
|
SetMessageTags(msg, G_ROCKETMQ_TAG_TEST.c_str());
|
||||||
SetMessageKeys(msg, G_ROCKETMQ_KEY.c_str());
|
SetMessageKeys(msg, G_ROCKETMQ_KEY_TEST.c_str());
|
||||||
|
|
||||||
SetMessageBody(msg, strbody);
|
SetMessageBody(msg, strbody);
|
||||||
// send message
|
// send message
|
||||||
@@ -786,7 +955,7 @@ void producer_send(const char* strbody)
|
|||||||
DestroyProducer(producer);
|
DestroyProducer(producer);
|
||||||
cout << "Producer Shutdown!" << endl;
|
cout << "Producer Shutdown!" << endl;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
@@ -795,13 +964,13 @@ void rocketmq_test_rt()
|
|||||||
{
|
{
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.monitor_id = 123123;
|
data.monitor_id = 123123;
|
||||||
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT);
|
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RT);
|
||||||
std::ifstream file("rt.txt"); // 文件中存储长字符串
|
std::ifstream file("rt.txt"); // 文件中存储长字符串
|
||||||
std::stringstream buffer;
|
std::stringstream buffer;
|
||||||
buffer << file.rdbuf(); // 读取整个文件内容
|
buffer << file.rdbuf(); // 读取整个文件内容
|
||||||
|
|
||||||
data.strText = QString::fromStdString(buffer.str());
|
data.strText = QString::fromStdString(buffer.str());
|
||||||
data.mp_id = 123123;
|
data.mp_id = QString::number(123456);
|
||||||
my_rocketmq_send(data);
|
my_rocketmq_send(data);
|
||||||
}
|
}
|
||||||
//extern std::string G_MQCONSUMER_TOPIC_UD;
|
//extern std::string G_MQCONSUMER_TOPIC_UD;
|
||||||
@@ -809,13 +978,13 @@ void rocketmq_test_ud()//用来测试台账更新
|
|||||||
{
|
{
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.monitor_id = 123123;
|
data.monitor_id = 123123;
|
||||||
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD);
|
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_UD);
|
||||||
std::ifstream file("ud.txt"); // 文件中存储长字符串
|
std::ifstream file("ud.txt"); // 文件中存储长字符串
|
||||||
std::stringstream buffer;
|
std::stringstream buffer;
|
||||||
buffer << file.rdbuf(); // 读取整个文件内容
|
buffer << file.rdbuf(); // 读取整个文件内容
|
||||||
|
|
||||||
data.strText = QString::fromStdString(buffer.str());
|
data.strText = QString::fromStdString(buffer.str());
|
||||||
data.mp_id = 123123;
|
data.mp_id = QString::number(123456);
|
||||||
my_rocketmq_send(data);
|
my_rocketmq_send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -823,13 +992,13 @@ void rocketmq_test_set()//用来测试进程控制脚本
|
|||||||
{
|
{
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.monitor_id = 123123;
|
data.monitor_id = 123123;
|
||||||
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET);
|
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_SET);
|
||||||
std::ifstream file("set.txt"); // 文件中存储长字符串
|
std::ifstream file("set.txt"); // 文件中存储长字符串
|
||||||
std::stringstream buffer;
|
std::stringstream buffer;
|
||||||
buffer << file.rdbuf(); // 读取整个文件内容
|
buffer << file.rdbuf(); // 读取整个文件内容
|
||||||
|
|
||||||
data.strText = QString::fromStdString(buffer.str());
|
data.strText = QString::fromStdString(buffer.str());
|
||||||
data.mp_id = 123123;
|
data.mp_id = QString::number(123456);
|
||||||
my_rocketmq_send(data);
|
my_rocketmq_send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -837,13 +1006,13 @@ void rocketmq_test_only()//用来测试进程控制脚本
|
|||||||
{
|
{
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.monitor_id = 123123;
|
data.monitor_id = 123123;
|
||||||
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET);
|
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_TEST);
|
||||||
std::ifstream file("set_debug.txt"); // 文件中存储长字符串
|
std::ifstream file("test.txt"); // 文件中存储长字符串
|
||||||
std::stringstream buffer;
|
std::stringstream buffer;
|
||||||
buffer << file.rdbuf(); // 读取整个文件内容
|
buffer << file.rdbuf(); // 读取整个文件内容
|
||||||
|
|
||||||
data.strText = QString::fromStdString(buffer.str());
|
data.strText = QString::fromStdString(buffer.str());
|
||||||
data.mp_id = 123123;
|
data.mp_id = QString::number(123456);
|
||||||
my_rocketmq_send(data);
|
my_rocketmq_send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -852,13 +1021,13 @@ void rocketmq_test_rc()
|
|||||||
{
|
{
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.monitor_id = 123123;
|
data.monitor_id = 123123;
|
||||||
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC);
|
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_RC);
|
||||||
std::ifstream file("rc.txt"); // 文件中存储长字符串
|
std::ifstream file("rc.txt"); // 文件中存储长字符串
|
||||||
std::stringstream buffer;
|
std::stringstream buffer;
|
||||||
buffer << file.rdbuf(); // 读取整个文件内容
|
buffer << file.rdbuf(); // 读取整个文件内容
|
||||||
|
|
||||||
data.strText = QString::fromStdString(buffer.str());
|
data.strText = QString::fromStdString(buffer.str());
|
||||||
data.mp_id = 123123;
|
data.mp_id = QString::number(123456);
|
||||||
my_rocketmq_send(data);
|
my_rocketmq_send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -867,13 +1036,13 @@ void rocketmq_test_log()
|
|||||||
{
|
{
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.monitor_id = 123123;
|
data.monitor_id = 123123;
|
||||||
data.strTopic = QString::fromStdString(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG);
|
data.strTopic = QString::fromStdString(G_MQCONSUMER_TOPIC_LOG);
|
||||||
std::ifstream file("log_test.txt"); // 文件中存储长字符串
|
std::ifstream file("log_test.txt"); // 文件中存储长字符串
|
||||||
std::stringstream buffer;
|
std::stringstream buffer;
|
||||||
buffer << file.rdbuf(); // 读取整个文件内容
|
buffer << file.rdbuf(); // 读取整个文件内容
|
||||||
|
|
||||||
data.strText = QString::fromStdString(buffer.str());
|
data.strText = QString::fromStdString(buffer.str());
|
||||||
data.mp_id = 123123;
|
data.mp_id = QString::number(123456);
|
||||||
my_rocketmq_send(data);
|
my_rocketmq_send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -280,9 +280,9 @@ extern int g_front_seg_num;
|
|||||||
//生产者
|
//生产者
|
||||||
std::string G_ROCKETMQ_PRODUCER = "";//rocketmq producer
|
std::string G_ROCKETMQ_PRODUCER = "";//rocketmq producer
|
||||||
std::string G_ROCKETMQ_IPPORT = "";//rocketmq ip+port
|
std::string G_ROCKETMQ_IPPORT = "";//rocketmq ip+port
|
||||||
std::string G_ROCKETMQ_TOPIC = "";//topie
|
std::string G_ROCKETMQ_TOPIC_TEST = "";//topie
|
||||||
std::string G_ROCKETMQ_TAG = "";//tag
|
std::string G_ROCKETMQ_TAG_TEST = "";//tag
|
||||||
std::string G_ROCKETMQ_KEY = "";//key
|
std::string G_ROCKETMQ_KEY_TEST = "";//key
|
||||||
int QUEUENUM = 0;
|
int QUEUENUM = 0;
|
||||||
std::string BROKERNAME = "";
|
std::string BROKERNAME = "";
|
||||||
//消费者
|
//消费者
|
||||||
@@ -327,6 +327,12 @@ std::string G_MQCONSUMER_TOPIC_FILE = "";//consumer topie
|
|||||||
std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag
|
std::string G_MQCONSUMER_TAG_FILE = "";//consumer tag
|
||||||
std::string G_MQCONSUMER_KEY_FILE = "";//consumer key
|
std::string G_MQCONSUMER_KEY_FILE = "";//consumer key
|
||||||
|
|
||||||
|
std::string G_REPLY_TOPIC_FILE = "";//consumer topie
|
||||||
|
std::string G_REPLY_TAG_FILE = "";//consumer tag
|
||||||
|
std::string G_REPLY_KEY_FILE = "";//consumer key
|
||||||
|
|
||||||
|
std::string G_MQCONSUMER_TOPIC_TEST = "";
|
||||||
|
|
||||||
int G_TEST_FLAG = 0;
|
int G_TEST_FLAG = 0;
|
||||||
int G_TEST_NUM = 0;
|
int G_TEST_NUM = 0;
|
||||||
int G_TEST_TYPE = 0;
|
int G_TEST_TYPE = 0;
|
||||||
@@ -644,12 +650,12 @@ void init_config() {
|
|||||||
G_ROCKETMQ_PRODUCER = strdup(ba.data());
|
G_ROCKETMQ_PRODUCER = strdup(ba.data());
|
||||||
ba = settings.value("RocketMq/Ipport", "").toString().toLatin1();
|
ba = settings.value("RocketMq/Ipport", "").toString().toLatin1();
|
||||||
G_ROCKETMQ_IPPORT = strdup(ba.data());
|
G_ROCKETMQ_IPPORT = strdup(ba.data());
|
||||||
ba = settings.value("RocketMq/Topic", "").toString().toLatin1();
|
ba = settings.value("RocketMq/TESTTopic", "").toString().toLatin1();
|
||||||
G_ROCKETMQ_TOPIC = strdup(ba.data());
|
G_ROCKETMQ_TOPIC_TEST = strdup(ba.data());
|
||||||
ba = settings.value("RocketMq/Tag", "").toString().toLatin1();
|
ba = settings.value("RocketMq/TESTTag", "").toString().toLatin1();
|
||||||
G_ROCKETMQ_TAG = strdup(ba.data());
|
G_ROCKETMQ_TAG_TEST = strdup(ba.data());
|
||||||
ba = settings.value("RocketMq/Key", "").toString().toLatin1();
|
ba = settings.value("RocketMq/TESTKey", "").toString().toLatin1();
|
||||||
G_ROCKETMQ_KEY = strdup(ba.data());
|
G_ROCKETMQ_KEY_TEST = strdup(ba.data());
|
||||||
QUEUENUM = settings.value("RocketMq/Queuenum", 0).toInt();
|
QUEUENUM = settings.value("RocketMq/Queuenum", 0).toInt();
|
||||||
|
|
||||||
//心跳
|
//心跳
|
||||||
@@ -724,12 +730,21 @@ void init_config() {
|
|||||||
G_CONNECT_KEY = strdup(ba.data());
|
G_CONNECT_KEY = strdup(ba.data());
|
||||||
|
|
||||||
//lnk20260310添加文件管理的topic和tag
|
//lnk20260310添加文件管理的topic和tag
|
||||||
ba = settings.value("RocketMq/ConsumerTopicFile", "").toString().toLatin1();
|
ba = settings.value("RocketMq/ConsumerTopicFILE", "").toString().toLatin1();
|
||||||
G_MQCONSUMER_TOPIC_FILE = strdup(ba.data());
|
G_MQCONSUMER_TOPIC_FILE = strdup(ba.data());
|
||||||
ba = settings.value("RocketMq/ConsumerTagFile", "").toString().toLatin1();
|
ba = settings.value("RocketMq/ConsumerTagFILE", "").toString().toLatin1();
|
||||||
G_MQCONSUMER_TAG_FILE = strdup(ba.data());
|
G_MQCONSUMER_TAG_FILE = strdup(ba.data());
|
||||||
ba = settings.value("RocketMq/ConsumerKeyFile", "").toString().toLatin1();
|
ba = settings.value("RocketMq/ConsumerKeyFILE", "").toString().toLatin1();
|
||||||
G_MQCONSUMER_KEY_FILE = strdup(ba.data());
|
G_MQCONSUMER_KEY_FILE = strdup(ba.data());
|
||||||
|
ba = settings.value("RocketMq/ReplyTopicFILE", "").toString().toLatin1();
|
||||||
|
G_REPLY_TOPIC_FILE = strdup(ba.data());
|
||||||
|
ba = settings.value("RocketMq/ReplyTagFILE", "").toString().toLatin1();
|
||||||
|
G_REPLY_TAG_FILE = strdup(ba.data());
|
||||||
|
ba = settings.value("RocketMq/ReplyKeyFILE", "").toString().toLatin1();
|
||||||
|
G_REPLY_KEY_FILE = strdup(ba.data());
|
||||||
|
|
||||||
|
ba = settings.value("RocketMq/ConsumerTopicTEST", "").toString().toLatin1();
|
||||||
|
G_MQCONSUMER_TOPIC_TEST = strdup(ba.data());
|
||||||
|
|
||||||
|
|
||||||
//MQ测试
|
//MQ测试
|
||||||
@@ -747,9 +762,9 @@ void init_config() {
|
|||||||
//生产者相关打印
|
//生产者相关打印
|
||||||
std::cout << "Read G_ROCKETMQ_PRODUCER:" << G_ROCKETMQ_PRODUCER << std::endl;
|
std::cout << "Read G_ROCKETMQ_PRODUCER:" << G_ROCKETMQ_PRODUCER << std::endl;
|
||||||
std::cout << "Read G_ROCKETMQ_IPPORT:" << G_ROCKETMQ_IPPORT << std::endl;
|
std::cout << "Read G_ROCKETMQ_IPPORT:" << G_ROCKETMQ_IPPORT << std::endl;
|
||||||
std::cout << "Read G_ROCKETMQ_TOPIC:" << G_ROCKETMQ_TOPIC << std::endl;
|
std::cout << "Read G_ROCKETMQ_TOPIC_TEST:" << G_ROCKETMQ_TOPIC_TEST << std::endl;
|
||||||
std::cout << "Read G_ROCKETMQ_TAG:" << G_ROCKETMQ_TAG << std::endl;
|
std::cout << "Read G_ROCKETMQ_TAG_TEST:" << G_ROCKETMQ_TAG_TEST << std::endl;
|
||||||
std::cout << "Read G_ROCKETMQ_KEY:" << G_ROCKETMQ_KEY << std::endl;
|
std::cout << "Read G_ROCKETMQ_KEY_TEST:" << G_ROCKETMQ_KEY_TEST << std::endl;
|
||||||
std::cout << "Read QUEUENUM:" << QUEUENUM << std::endl;
|
std::cout << "Read QUEUENUM:" << QUEUENUM << std::endl;
|
||||||
std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl;
|
std::cout << "Read G_LOG_TOPIC:" << G_LOG_TOPIC << std::endl;
|
||||||
std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl;
|
std::cout << "Read G_LOG_TAG:" << G_LOG_TAG << std::endl;
|
||||||
@@ -4189,6 +4204,18 @@ int terminal_ledger_web(QMap<QString, terminal_dev*>* terminal_dev_map,
|
|||||||
return 0; // 确保函数有返回值
|
return 0; // 确保函数有返回值
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void init_oper_type_cache(ied_usr_t *ied_usr)
|
||||||
|
{
|
||||||
|
if (ied_usr == NULL)
|
||||||
|
return;
|
||||||
|
|
||||||
|
ied_usr->oper_type_cache.inited = SD_FALSE;
|
||||||
|
|
||||||
|
ied_usr->oper_type_cache.ledrs_oper_type_id = -1;
|
||||||
|
ied_usr->oper_type_cache.reboot_oper_type_id = -1;
|
||||||
|
ied_usr->oper_type_cache.reset_oper_type_id = -1;
|
||||||
|
}
|
||||||
|
|
||||||
int parse_device_cfg_web()
|
int parse_device_cfg_web()
|
||||||
{
|
{
|
||||||
std::cout << "parse_device_cfg_web" << endl;
|
std::cout << "parse_device_cfg_web" << endl;
|
||||||
@@ -4422,6 +4449,10 @@ int parse_device_cfg_web()
|
|||||||
apr_snprintf(ied_usr->dev_key, sizeof(ied_usr->dev_key), "%s", "");//DEV_Key
|
apr_snprintf(ied_usr->dev_key, sizeof(ied_usr->dev_key), "%s", "");//DEV_Key
|
||||||
cout << "defalut dev_key:" << ied_usr->dev_key << endl;
|
cout << "defalut dev_key:" << ied_usr->dev_key << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lnk20260512
|
||||||
|
init_oper_type_cache(ied_usr);
|
||||||
|
|
||||||
//lnk20260304
|
//lnk20260304
|
||||||
ied_usr->log_level = log_level;//日志等级
|
ied_usr->log_level = log_level;//日志等级
|
||||||
cout << "ied_usr->log_level:" << ied_usr->log_level << endl;
|
cout << "ied_usr->log_level:" << ied_usr->log_level << endl;
|
||||||
@@ -5301,7 +5332,7 @@ std::string base64_encode(const std::string& in) {
|
|||||||
return out; // 返回编码后的字符串
|
return out; // 返回编码后的字符串
|
||||||
}
|
}
|
||||||
|
|
||||||
void handleUploadResponse(const std::string& response, char* wavepath) {
|
void handleUploadResponse(const std::string& response,const std::string& localPath, char* wavepath, int type) {
|
||||||
|
|
||||||
// 解析 JSON 响应
|
// 解析 JSON 响应
|
||||||
cJSON* json_data = cJSON_Parse(response.c_str());
|
cJSON* json_data = cJSON_Parse(response.c_str());
|
||||||
@@ -5349,11 +5380,43 @@ void handleUploadResponse(const std::string& response, char* wavepath) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 拷贝到 wavepath
|
// 拷贝到 wavepath
|
||||||
|
if (type == 1) {
|
||||||
strcpy(wavepath, nameWithoutExt.c_str());
|
strcpy(wavepath, nameWithoutExt.c_str());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
strcpy(wavepath, name.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
std::cout << "wavepath: " << wavepath << std::endl;
|
std::cout << "wavepath: " << wavepath << std::endl;
|
||||||
|
|
||||||
DIY_INFOLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【NORMAL】前置上传文件成功,远端文件名:%s",wavepath);
|
DIY_INFOLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【NORMAL】前置上传文件成功,远端文件名:%s",wavepath);
|
||||||
|
|
||||||
|
// =========================
|
||||||
|
// 上传成功后删除本地文件
|
||||||
|
// =========================
|
||||||
|
if (remove(localPath.c_str()) == 0)
|
||||||
|
{
|
||||||
|
std::cout << "Delete local file success: "
|
||||||
|
<< localPath << std::endl;
|
||||||
|
DIY_INFOLOG_CODE("process",0,
|
||||||
|
LOG_CODE_TRANSIENT_COMM,
|
||||||
|
"【NORMAL】删除本地文件成功:%s",
|
||||||
|
localPath.c_str());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << "Delete local file failed: "
|
||||||
|
<< localPath
|
||||||
|
<< " errno=" << errno
|
||||||
|
<< " err=" << strerror(errno)
|
||||||
|
<< std::endl;
|
||||||
|
DIY_ERRORLOG_CODE("process",0,
|
||||||
|
LOG_CODE_TRANSIENT_COMM,
|
||||||
|
"【ERROR】删除本地文件失败:%s errno=%d err=%s",
|
||||||
|
localPath.c_str(),
|
||||||
|
errno,
|
||||||
|
strerror(errno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
std::cerr << "Error: Missing expected fields in JSON response." << std::endl;
|
std::cerr << "Error: Missing expected fields in JSON response." << std::endl;
|
||||||
@@ -5423,7 +5486,7 @@ void handleUploadResponse(const std::string& response, char* wavepath) {
|
|||||||
}*/
|
}*/
|
||||||
|
|
||||||
//这是dataform发送方式
|
//这是dataform发送方式
|
||||||
void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath) {
|
void SendFileWeb(const std::string& strUrl, const char* localpath, const char* cloudpath, char* wavepath,int type) {
|
||||||
// 初始化 curl
|
// 初始化 curl
|
||||||
CURL* curl = curl_easy_init();
|
CURL* curl = curl_easy_init();
|
||||||
if (curl) {
|
if (curl) {
|
||||||
@@ -5477,7 +5540,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c
|
|||||||
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath);
|
DIY_ERRORLOG_CODE("process",0,LOG_CODE_TRANSIENT_COMM,"【ERROR】前置上传暂态录波文件 %s 失败,请检查文件上传接口配置",localpath);
|
||||||
} else {
|
} else {
|
||||||
std::cout << "http web success, response: " << resPost0 << std::endl;
|
std::cout << "http web success, response: " << resPost0 << std::endl;
|
||||||
handleUploadResponse(resPost0, wavepath); // 处理响应
|
handleUploadResponse(resPost0, localpath, wavepath, type); // 处理响应
|
||||||
}
|
}
|
||||||
|
|
||||||
// 清理
|
// 清理
|
||||||
@@ -5492,7 +5555,7 @@ void SendFileWeb(const std::string& strUrl, const char* localpath, const char* c
|
|||||||
void SOEFileWeb(char* localpath,char* cloudpath, char* wavepath)
|
void SOEFileWeb(char* localpath,char* cloudpath, char* wavepath)
|
||||||
{
|
{
|
||||||
//示例ip,更换为实际ip即可
|
//示例ip,更换为实际ip即可
|
||||||
SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath);
|
SendFileWeb(WEB_FILEUPLOAD,localpath,cloudpath,wavepath,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SOEFileWeb_test()
|
void SOEFileWeb_test()
|
||||||
@@ -5545,9 +5608,9 @@ int DownloadFileWeb(const std::string& strUrl,
|
|||||||
|
|
||||||
std::string fullUrl = strUrl;
|
std::string fullUrl = strUrl;
|
||||||
if (fullUrl.find('?') == std::string::npos)
|
if (fullUrl.find('?') == std::string::npos)
|
||||||
fullUrl += "?path=";
|
fullUrl += "?filePath=";
|
||||||
else
|
else
|
||||||
fullUrl += "&path=";
|
fullUrl += "&filePath=";
|
||||||
fullUrl += encodedPath;
|
fullUrl += encodedPath;
|
||||||
|
|
||||||
curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
|
curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
|
||||||
@@ -5569,15 +5632,26 @@ int DownloadFileWeb(const std::string& strUrl,
|
|||||||
if (res != CURLE_OK)
|
if (res != CURLE_OK)
|
||||||
{
|
{
|
||||||
std::cerr << "DownloadFileWeb failed: " << curl_easy_strerror(res) << std::endl;
|
std::cerr << "DownloadFileWeb failed: " << curl_easy_strerror(res) << std::endl;
|
||||||
|
remove(localpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (http_code != 200)
|
if (http_code != 200)
|
||||||
{
|
{
|
||||||
std::cerr << "DownloadFileWeb http code: " << http_code << std::endl;
|
std::cerr << "DownloadFileWeb http code: " << http_code << std::endl;
|
||||||
|
remove(localpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (chmod(localpath, 0777) != 0)
|
||||||
|
{
|
||||||
|
std::cerr << "chmod 777 failed: " << localpath << std::endl;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
std::cout << "chmod 777 success: " << localpath << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5711,6 +5785,9 @@ int update_one_terminal_ledger(terminal* update, int i,ied_t* ied,int terminal_i
|
|||||||
chnl_usr->m_state = CHANNEL_DISCONNECTED;
|
chnl_usr->m_state = CHANNEL_DISCONNECTED;
|
||||||
chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1);
|
chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1);
|
||||||
|
|
||||||
|
//lnk20260512
|
||||||
|
init_oper_type_cache(ied_usr);
|
||||||
|
|
||||||
// 将 monitorData 中的数据写入到 LD_info 中
|
// 将 monitorData 中的数据写入到 LD_info 中
|
||||||
int count_real_monitor = 0; //遍历监测点台账的计数器
|
int count_real_monitor = 0; //遍历监测点台账的计数器
|
||||||
int j;
|
int j;
|
||||||
@@ -6528,7 +6605,7 @@ bool shouldSkipTerminal(const char* terminal_id) {
|
|||||||
|
|
||||||
void rocketmq_test_300(int mpnum,int front_index,int type) {
|
void rocketmq_test_300(int mpnum,int front_index,int type) {
|
||||||
Ckafka_data_t data;
|
Ckafka_data_t data;
|
||||||
data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC);
|
data.strTopic = QString::fromStdString(G_ROCKETMQ_TOPIC_TEST);
|
||||||
data.mp_id = "0";
|
data.mp_id = "0";
|
||||||
|
|
||||||
// 读取文件内容
|
// 读取文件内容
|
||||||
@@ -7167,6 +7244,7 @@ void send_reply_to_kafka(const std::string& guid, const std::string& step, const
|
|||||||
// 封装 Kafka 消息
|
// 封装 Kafka 消息
|
||||||
Ckafka_data_t connect_info;
|
Ckafka_data_t connect_info;
|
||||||
connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
|
connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
|
||||||
|
connect_info.mp_id = QString::fromStdString(guid);//guid作为key
|
||||||
connect_info.strText = QString::fromStdString(jsonString);
|
connect_info.strText = QString::fromStdString(jsonString);
|
||||||
|
|
||||||
// 加入发送队列(带互斥锁保护)
|
// 加入发送队列(带互斥锁保护)
|
||||||
@@ -7195,6 +7273,7 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step
|
|||||||
// 封装 Kafka 消息
|
// 封装 Kafka 消息
|
||||||
Ckafka_data_t connect_info;
|
Ckafka_data_t connect_info;
|
||||||
connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
|
connect_info.strTopic = QString::fromStdString(Topic_Reply_Topic);
|
||||||
|
connect_info.mp_id = QString::fromStdString(guid);//guid作为key
|
||||||
connect_info.strText = QString::fromStdString(jsonString);
|
connect_info.strText = QString::fromStdString(jsonString);
|
||||||
|
|
||||||
// 加入发送队列(带互斥锁保护)
|
// 加入发送队列(带互斥锁保护)
|
||||||
@@ -7204,20 +7283,25 @@ void send_reply_to_kafka_recall(const std::string& guid, const std::string& step
|
|||||||
}
|
}
|
||||||
|
|
||||||
void send_heartbeat_to_kafka(const std::string& status) {
|
void send_heartbeat_to_kafka(const std::string& status) {
|
||||||
|
|
||||||
|
std::string front_type = get_front_type_from_subdir();
|
||||||
// 构造 JSON 字符串
|
// 构造 JSON 字符串
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
oss << "{"
|
oss << "{"
|
||||||
<< "\"nodeId\":\"" << FRONT_INST << "\","
|
<< "\"nodeId\":\"" << FRONT_INST << "\","
|
||||||
<< "\"frontType\":\"" << get_front_type_from_subdir() << "\","
|
<< "\"frontType\":\"" << front_type << "\","
|
||||||
<< "\"processNo\":\"" << g_front_seg_index << "\","
|
<< "\"processNo\":\"" << g_front_seg_index << "\","
|
||||||
<< "\"status\":\"" << status << "\""
|
<< "\"status\":\"" << status << "\""
|
||||||
<< "}";
|
<< "}";
|
||||||
|
|
||||||
std::string jsonString = oss.str();
|
std::string jsonString = oss.str();
|
||||||
|
|
||||||
|
std::string mpid_str = std::to_string(g_node_id) + "_" + std::to_string(g_front_seg_index);
|
||||||
|
|
||||||
// 封装 Kafka 消息
|
// 封装 Kafka 消息
|
||||||
Ckafka_data_t connect_info;
|
Ckafka_data_t connect_info;
|
||||||
connect_info.strTopic = QString::fromStdString(Heart_Beat_Topic);
|
connect_info.strTopic = QString::fromStdString(Heart_Beat_Topic);
|
||||||
|
connect_info.mp_id = QString::fromStdString(mpid_str);
|
||||||
connect_info.strText = QString::fromStdString(jsonString);
|
connect_info.strText = QString::fromStdString(jsonString);
|
||||||
|
|
||||||
// 加入发送队列(带互斥锁保护)
|
// 加入发送队列(带互斥锁保护)
|
||||||
|
|||||||
@@ -360,19 +360,23 @@ protected:
|
|||||||
final_msg = suppressed_oss.str();
|
final_msg = suppressed_oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string business_id = extract_logger_id(logger_name);
|
||||||
|
std::string front_type = get_front_type_from_subdir();
|
||||||
|
|
||||||
std::ostringstream oss;
|
std::ostringstream oss;
|
||||||
oss << "{\"processNo\":\"" << intToString(g_front_seg_index)
|
oss << "{\"processNo\":\"" << intToString(g_front_seg_index)
|
||||||
<< "\",\"nodeId\":\"" << FRONT_INST
|
<< "\",\"nodeId\":\"" << escape_json(FRONT_INST)
|
||||||
<< "\",\"businessId\":\"" << extract_logger_id(logger_name)
|
<< "\",\"businessId\":\"" << escape_json(business_id)
|
||||||
<< "\",\"level\":\"" << level_str
|
<< "\",\"level\":\"" << escape_json(level_str)
|
||||||
<< "\",\"grade\":\"" << get_level_str(level)
|
<< "\",\"grade\":\"" << escape_json(get_level_str(level))
|
||||||
<< "\",\"logtype\":\"" << safe_logtype
|
<< "\",\"logtype\":\"" << safe_logtype
|
||||||
<< "\",\"frontType\":\"" << get_front_type_from_subdir()
|
<< "\",\"frontType\":\"" << escape_json(front_type)
|
||||||
<< "\",\"code\":" << code
|
<< "\",\"code\":" << code
|
||||||
<< ",\"log\":\"" << escape_json(final_msg) << "\"}";
|
<< ",\"log\":\"" << escape_json(final_msg) << "\"}";
|
||||||
|
|
||||||
Ckafka_data_t connect_info;
|
Ckafka_data_t connect_info;
|
||||||
connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC);
|
connect_info.strTopic = QString::fromStdString(G_LOG_TOPIC);
|
||||||
|
connect_info.mp_id = QString::fromStdString(business_id);
|
||||||
connect_info.strText = QString::fromStdString(oss.str());
|
connect_info.strText = QString::fromStdString(oss.str());
|
||||||
|
|
||||||
kafka_data_list_mutex.lock();
|
kafka_data_list_mutex.lock();
|
||||||
|
|||||||
@@ -224,8 +224,19 @@ extern "C" {
|
|||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
#endif
|
#endif
|
||||||
|
/*
|
||||||
#define max(a,b) (((a) > (b)) ? (a) : (b))
|
#define max(a,b) (((a) > (b)) ? (a) : (b))
|
||||||
#define min(a,b) (((a) < (b)) ? (a) : (b))
|
#define min(a,b) (((a) < (b)) ? (a) : (b))
|
||||||
|
*/
|
||||||
|
#ifndef __cplusplus
|
||||||
|
#ifndef max
|
||||||
|
#define max(a,b) (((a) > (b)) ? (a) : (b))
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifndef min
|
||||||
|
#define min(a,b) (((a) < (b)) ? (a) : (b))
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
|
|||||||
@@ -1,21 +1,34 @@
|
|||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
#include "../json/mms_json_inter.h"
|
#include "../json/mms_json_inter.h"
|
||||||
#include "../rocketmq/CProducer.h"
|
//#include "../rocketmq/CProducer.h"
|
||||||
#include "../rocketmq/CMessage.h"
|
//#include "../rocketmq/CMessage.h"
|
||||||
#include "../rocketmq/CSendResult.h"
|
//#include "../rocketmq/CSendResult.h"
|
||||||
|
//#include "../rocketmq/CPushConsumer.h"
|
||||||
#include "../rocketmq/CPushConsumer.h"
|
#include "../rocketmq/DefaultMQProducer.h"
|
||||||
|
#include "../rocketmq/MQMessage.h"
|
||||||
|
#include "../rocketmq/SendResult.h"
|
||||||
|
#include "../rocketmq/SessionCredentials.h"
|
||||||
|
#include "../rocketmq/MQMessageExt.h"
|
||||||
|
#include "../rocketmq/ConsumeType.h"
|
||||||
|
#include "../rocketmq/MQMessageListener.h"
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
using namespace rocketmq;
|
||||||
|
|
||||||
/*添加测试函数lnk10-10*/
|
/*添加测试函数lnk10-10*/
|
||||||
void producer_send0();
|
//void producer_send0();
|
||||||
void StartSendMessage(CProducer* producer,const char* strbody);
|
//void StartSendMessage(CProducer* producer,const char* strbody);
|
||||||
void producer_send(const char* strbody);
|
//void producer_send(const char* strbody);
|
||||||
void rocketmq_producer_send(const char* strbody,const char* topic);
|
//void rocketmq_producer_send(const char* strbody,const char* topic);
|
||||||
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
|
//void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
|
||||||
|
void rocketmq_producer_send(const std::string& body,
|
||||||
|
const std::string& topic,
|
||||||
|
const std::string& tags,
|
||||||
|
const std::string& keys);
|
||||||
extern "C" {
|
extern "C" {
|
||||||
void rocketmq_test_rt();
|
void rocketmq_test_rt();
|
||||||
void rocketmq_test_ud();
|
void rocketmq_test_ud();
|
||||||
@@ -32,24 +45,29 @@ extern void my_rocketmq_send(Ckafka_data_t& data);
|
|||||||
void InitializeProducer();
|
void InitializeProducer();
|
||||||
void ShutdownAndDestroyProducer();
|
void ShutdownAndDestroyProducer();
|
||||||
//////////////////////////////////////////////////////消费者
|
//////////////////////////////////////////////////////消费者
|
||||||
void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
|
typedef ConsumeStatus (*MessageCallBack)(
|
||||||
void ShutdownAndDestroyConsumer();
|
const MQMessageExt& msg
|
||||||
|
);
|
||||||
|
|
||||||
struct Subscription {
|
struct Subscription {
|
||||||
std::string topic;
|
std::string topic;
|
||||||
std::string tag;
|
std::string tag;
|
||||||
MessageCallBack callback;
|
MessageCallBack callback;
|
||||||
|
|
||||||
Subscription(const std::string& t, const std::string& tg, MessageCallBack cb)
|
Subscription(const std::string& t,
|
||||||
|
const std::string& tg,
|
||||||
|
MessageCallBack cb)
|
||||||
: topic(t), tag(tg), callback(cb) {}
|
: topic(t), tag(tg), callback(cb) {}
|
||||||
};
|
};
|
||||||
|
//void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
|
||||||
|
void InitializeConsumer(const std::string& consumerName,
|
||||||
|
const std::string& nameServer,
|
||||||
|
const std::vector<Subscription>& subscriptions);
|
||||||
|
void ShutdownAndDestroyConsumer();
|
||||||
|
|
||||||
void rocketmq_consumer_receive(
|
void rocketmq_consumer_receive(
|
||||||
const std::string& consumerName,
|
const std::string& consumerName,
|
||||||
const std::string& nameServer,
|
const std::string& nameServer,
|
||||||
//const std::string& topic,
|
|
||||||
//const std::string& tag,
|
|
||||||
//MessageCallBack callback);
|
|
||||||
const std::vector<Subscription>& subscriptions);
|
const std::vector<Subscription>& subscriptions);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
|
|||||||
@@ -49,6 +49,16 @@
|
|||||||
#ifndef HIBYTE
|
#ifndef HIBYTE
|
||||||
#define HIBYTE(w) ((byte_t)((uint16_t)(w) >> 8))
|
#define HIBYTE(w) ((byte_t)((uint16_t)(w) >> 8))
|
||||||
#endif
|
#endif
|
||||||
|
/*
|
||||||
|
#ifndef max
|
||||||
|
#define max(a,b) (((a) > (b)) ? (a) : (b))
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifndef min
|
||||||
|
#define min(a,b) (((a) < (b)) ? (a) : (b))
|
||||||
|
#endif
|
||||||
|
*/
|
||||||
|
#ifndef __cplusplus
|
||||||
|
|
||||||
#ifndef max
|
#ifndef max
|
||||||
#define max(a,b) (((a) > (b)) ? (a) : (b))
|
#define max(a,b) (((a) > (b)) ? (a) : (b))
|
||||||
@@ -58,6 +68,7 @@
|
|||||||
#define min(a,b) (((a) < (b)) ? (a) : (b))
|
#define min(a,b) (((a) < (b)) ? (a) : (b))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#endif
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
1297
json/PQSMsg.cpp
Normal file
1297
json/PQSMsg.cpp
Normal file
File diff suppressed because it is too large
Load Diff
2548
json/PQSMsg.h
Normal file
2548
json/PQSMsg.h
Normal file
File diff suppressed because it is too large
Load Diff
@@ -45,6 +45,8 @@ extern std::string WEB_EVENT;
|
|||||||
extern std::string WEB_FILEDOWNLOAD;
|
extern std::string WEB_FILEDOWNLOAD;
|
||||||
extern std::string G_CONNECT_TOPIC;
|
extern std::string G_CONNECT_TOPIC;
|
||||||
|
|
||||||
|
extern int RECALL_ONLY_FLAG;
|
||||||
|
|
||||||
//lnk20250115添加台账锁
|
//lnk20250115添加台账锁
|
||||||
extern pthread_mutex_t mtx;
|
extern pthread_mutex_t mtx;
|
||||||
|
|
||||||
@@ -164,6 +166,7 @@ public:
|
|||||||
QString WavePhasicB;
|
QString WavePhasicB;
|
||||||
QString WavePhasicC;
|
QString WavePhasicC;
|
||||||
QString TypeOfData; //闪变和统计是否合并 0-分开 1-合并
|
QString TypeOfData; //闪变和统计是否合并 0-分开 1-合并
|
||||||
|
QString IEDControl; //例:LD0 lnk2026-5-13
|
||||||
QString UnitOfTimeUnit; //暂态事件持续事件单位:0 - 毫秒 1 - 秒 lnk20260127
|
QString UnitOfTimeUnit; //暂态事件持续事件单位:0 - 毫秒 1 - 秒 lnk20260127
|
||||||
QString ValueOfTimeUnit; //上送值的时间:UTC-UTC时间 beijing-北京时间
|
QString ValueOfTimeUnit; //上送值的时间:UTC-UTC时间 beijing-北京时间
|
||||||
QString WaveTimeFlag; //录波文件的时间:UTC-UTC时间 beijing-北京时间
|
QString WaveTimeFlag; //录波文件的时间:UTC-UTC时间 beijing-北京时间
|
||||||
@@ -382,6 +385,7 @@ bool get_xml_config_by_dev_type(const char* dev_type, XmlConfigC* out_cfg) {
|
|||||||
strncpy(out_cfg->TypeOfData, cfg.TypeOfData.toUtf8().constData(), sizeof(out_cfg->TypeOfData) - 1); out_cfg->TypeOfData[sizeof(out_cfg->TypeOfData) - 1] = '\0';//lnk20260127
|
strncpy(out_cfg->TypeOfData, cfg.TypeOfData.toUtf8().constData(), sizeof(out_cfg->TypeOfData) - 1); out_cfg->TypeOfData[sizeof(out_cfg->TypeOfData) - 1] = '\0';//lnk20260127
|
||||||
strncpy(out_cfg->ValueOfTimeUnit, cfg.ValueOfTimeUnit.toUtf8().constData(),sizeof(out_cfg->ValueOfTimeUnit) - 1);
|
strncpy(out_cfg->ValueOfTimeUnit, cfg.ValueOfTimeUnit.toUtf8().constData(),sizeof(out_cfg->ValueOfTimeUnit) - 1);
|
||||||
strncpy(out_cfg->WaveTimeFlag, cfg.WaveTimeFlag.toUtf8().constData(), sizeof(out_cfg->WaveTimeFlag) - 1);
|
strncpy(out_cfg->WaveTimeFlag, cfg.WaveTimeFlag.toUtf8().constData(), sizeof(out_cfg->WaveTimeFlag) - 1);
|
||||||
|
strncpy(out_cfg->IEDControl, cfg.IEDControl.toUtf8().constData(), sizeof(out_cfg->IEDControl) - 1);//lnk2026-5-13
|
||||||
strncpy(out_cfg->IEDname, cfg.IEDname.toUtf8().constData(), sizeof(out_cfg->IEDname) - 1);
|
strncpy(out_cfg->IEDname, cfg.IEDname.toUtf8().constData(), sizeof(out_cfg->IEDname) - 1);
|
||||||
strncpy(out_cfg->LDevicePrefix, cfg.LDevicePrefix.toUtf8().constData(), sizeof(out_cfg->LDevicePrefix) - 1);
|
strncpy(out_cfg->LDevicePrefix, cfg.LDevicePrefix.toUtf8().constData(), sizeof(out_cfg->LDevicePrefix) - 1);
|
||||||
|
|
||||||
@@ -1034,6 +1038,10 @@ bool ParseXMLConfig2(int xml_flag, XmlConfig *cfg, list<CTopic*> *ctopiclist,QSt
|
|||||||
cfg->WaveTimeFlag.append(e.attribute("WaveTimeFlag"));
|
cfg->WaveTimeFlag.append(e.attribute("WaveTimeFlag"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
if ("IEDControl" == strTag)
|
||||||
|
{
|
||||||
|
cfg->IEDControl.append(e.attribute("name"));
|
||||||
|
}
|
||||||
if ("IED" == strTag)
|
if ("IED" == strTag)
|
||||||
{
|
{
|
||||||
cfg->IEDname.append(e.attribute("name"));
|
cfg->IEDname.append(e.attribute("name"));
|
||||||
@@ -1253,6 +1261,7 @@ int transfer_json_block_data(char v_wiring_type[], json_block_data *data) //json
|
|||||||
printf("TypeOfData = '%s'\n", cfg1.TypeOfData);
|
printf("TypeOfData = '%s'\n", cfg1.TypeOfData);
|
||||||
printf("ValueOfTimeUnit = '%s'\n", cfg1.ValueOfTimeUnit);
|
printf("ValueOfTimeUnit = '%s'\n", cfg1.ValueOfTimeUnit);
|
||||||
printf("WaveTimeFlag = '%s'\n", cfg1.WaveTimeFlag);
|
printf("WaveTimeFlag = '%s'\n", cfg1.WaveTimeFlag);
|
||||||
|
printf("IEDControl = '%s'\n", cfg1.IEDControl);
|
||||||
printf("IEDname = '%s'\n", cfg1.IEDname);
|
printf("IEDname = '%s'\n", cfg1.IEDname);
|
||||||
printf("LDevicePrefix = '%s'\n", cfg1.LDevicePrefix);
|
printf("LDevicePrefix = '%s'\n", cfg1.LDevicePrefix);
|
||||||
printf("=====================================\n");
|
printf("=====================================\n");
|
||||||
@@ -3271,14 +3280,13 @@ void connectlog_pgsql(char* id,char* datetime,int status)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//使用mq
|
//使用mq
|
||||||
Ckafka_data_t connect_info;
|
Ckafka_data_t connect_info;
|
||||||
connect_info.strTopic = QString::fromStdString(G_CONNECT_TOPIC);
|
connect_info.strTopic = QString::fromStdString(G_CONNECT_TOPIC);
|
||||||
|
connect_info.mp_id = QString::fromLocal8Bit(id);//这里填装置id,后续作为key
|
||||||
connect_info.strText = QString::fromStdString(std::string(jsonString));
|
connect_info.strText = QString::fromStdString(std::string(jsonString));
|
||||||
|
|
||||||
if(g_node_id == STAT_DATA_BASE_NODE_ID){//稳态才上传
|
if((g_node_id == STAT_DATA_BASE_NODE_ID && RECALL_ONLY_FLAG == 0) || (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID && RECALL_ONLY_FLAG == 1)){//稳态或者补招才上传
|
||||||
kafka_data_list_mutex.lock(); //加锁
|
kafka_data_list_mutex.lock(); //加锁
|
||||||
kafka_data_list.append(connect_info); //添加 kafka发送链表
|
kafka_data_list.append(connect_info); //添加 kafka发送链表
|
||||||
kafka_data_list_mutex.unlock(); //解锁
|
kafka_data_list_mutex.unlock(); //解锁
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
BIN
librocketmq.a
BIN
librocketmq.a
Binary file not shown.
BIN
librocketmq.so
BIN
librocketmq.so
Binary file not shown.
@@ -113,12 +113,12 @@ extern LOG_TLS int g_log_code_tls; // 声明为 TLS 变量,定义见 log4.cpp
|
|||||||
int __old_code__ = g_log_code_tls; \
|
int __old_code__ = g_log_code_tls; \
|
||||||
g_log_code_tls = (int)(CODE_INT); \
|
g_log_code_tls = (int)(CODE_INT); \
|
||||||
\
|
\
|
||||||
char __msg_buf__[256]; \
|
char __msg_buf__[512]; \
|
||||||
format_log_msg(__msg_buf__, sizeof(__msg_buf__), __VA_ARGS__); \
|
format_log_msg(__msg_buf__, sizeof(__msg_buf__), __VA_ARGS__); \
|
||||||
\
|
\
|
||||||
const char* __key_raw__ = (KEY); \
|
const char* __key_raw__ = (KEY); \
|
||||||
\
|
\
|
||||||
char __key_buf__[256]; \
|
char __key_buf__[512]; \
|
||||||
switch ((int)(KEY_TYPE)) { \
|
switch ((int)(KEY_TYPE)) { \
|
||||||
case 0: \
|
case 0: \
|
||||||
snprintf(__key_buf__, sizeof(__key_buf__), "process"); \
|
snprintf(__key_buf__, sizeof(__key_buf__), "process"); \
|
||||||
|
|||||||
@@ -165,6 +165,7 @@ typedef struct {
|
|||||||
char TypeOfData[64];
|
char TypeOfData[64];
|
||||||
char ValueOfTimeUnit[64];
|
char ValueOfTimeUnit[64];
|
||||||
char WaveTimeFlag[64];
|
char WaveTimeFlag[64];
|
||||||
|
char IEDControl[64];
|
||||||
char IEDname[64];
|
char IEDname[64];
|
||||||
char LDevicePrefix[64];
|
char LDevicePrefix[64];
|
||||||
} XmlConfigC;
|
} XmlConfigC;
|
||||||
@@ -183,6 +184,8 @@ typedef struct file_dir_req_t
|
|||||||
char devid[128];
|
char devid[128];
|
||||||
int type;
|
int type;
|
||||||
char path[256];
|
char path[256];
|
||||||
|
char remote_path[256];
|
||||||
|
|
||||||
time_t create_time;
|
time_t create_time;
|
||||||
} file_dir_req_t;
|
} file_dir_req_t;
|
||||||
|
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ extern apr_pool_t* g_cfg_pool;
|
|||||||
extern apr_pool_t* g_init_pool;
|
extern apr_pool_t* g_init_pool;
|
||||||
|
|
||||||
extern int g_DevFlag; //日志配置中读取的参数,暂无特定使用lnk20250121
|
extern int g_DevFlag; //日志配置中读取的参数,暂无特定使用lnk20250121
|
||||||
|
extern bool DEBUGOPEN;//调试开关,控制是否输出调试日志,默认关闭
|
||||||
extern int IED_COUNT;
|
extern int IED_COUNT;
|
||||||
extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量,标志是否只运行补招程序
|
extern int RECALL_ONLY_FLAG; //lnk20260309添加一个全局变量,标志是否只运行补招程序
|
||||||
|
|
||||||
@@ -1531,6 +1531,8 @@ void CheckAllConnectedChannel()
|
|||||||
if(chnl_usr->m_state == CHANNEL_CONNECTED)
|
if(chnl_usr->m_state == CHANNEL_CONNECTED)
|
||||||
{
|
{
|
||||||
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
|
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {
|
||||||
|
InitLedrsOperTypeForChannel(chnl_usr);//写特殊控制的初始化
|
||||||
|
if(DEBUGOPEN)printf("[FILEDIR] enter HandleFileDirReqForChannel");
|
||||||
HandleFileDirReqForChannel(chnl_usr);//文件目录请求
|
HandleFileDirReqForChannel(chnl_usr);//文件目录请求
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
493
mms/mmsclient.c
493
mms/mmsclient.c
@@ -80,6 +80,9 @@
|
|||||||
|
|
||||||
#include <ctype.h> //lnk20241119
|
#include <ctype.h> //lnk20241119
|
||||||
#include "../cfg_parse/custom_printf.h"//lnk20250225
|
#include "../cfg_parse/custom_printf.h"//lnk20250225
|
||||||
|
|
||||||
|
#include "../log4cplus/log4.h"
|
||||||
|
|
||||||
extern uint32_t g_node_id;
|
extern uint32_t g_node_id;
|
||||||
extern char subdir[128];
|
extern char subdir[128];
|
||||||
unsigned int g_no_auth = 0;
|
unsigned int g_no_auth = 0;
|
||||||
@@ -132,6 +135,8 @@ IDENT_RESP_INFO identify_response_info =
|
|||||||
|
|
||||||
/************************************************************************/
|
/************************************************************************/
|
||||||
|
|
||||||
|
extern pt61850app_t *g_pt61850app;
|
||||||
|
|
||||||
extern TP0_CONN *tp0_conn_arr; /* ptr to array of "max_num_conns" structs */
|
extern TP0_CONN *tp0_conn_arr; /* ptr to array of "max_num_conns" structs */
|
||||||
|
|
||||||
static ST_VOID disc_ind_fun (MVL_NET_INFO *cc, ST_INT discType);
|
static ST_VOID disc_ind_fun (MVL_NET_INFO *cc, ST_INT discType);
|
||||||
@@ -269,12 +274,25 @@ MY_CONTROL_INFO my_control_info;
|
|||||||
ST_INT mms_var_type_id_create (MVL_NET_INFO *clientNetInfo, ST_INT scope,
|
ST_INT mms_var_type_id_create (MVL_NET_INFO *clientNetInfo, ST_INT scope,
|
||||||
ST_CHAR *dom_name, ST_CHAR *var_name, int iTimeOut)
|
ST_CHAR *dom_name, ST_CHAR *var_name, int iTimeOut)
|
||||||
{
|
{
|
||||||
MVL_REQ_PEND *reqCtrl;
|
//MVL_REQ_PEND *reqCtrl;
|
||||||
|
//reqCtrl 必须初始化为 NULL,防止 mvla_getvar 失败后释放野指针
|
||||||
|
MVL_REQ_PEND *reqCtrl = NULL;
|
||||||
|
|
||||||
GETVAR_REQ_INFO getvar_req;
|
GETVAR_REQ_INFO getvar_req;
|
||||||
ST_INT type_id = -1; /* start with invalid type id */
|
ST_INT type_id = -1; /* start with invalid type id */
|
||||||
ST_RET ret;
|
ST_RET ret;
|
||||||
|
|
||||||
|
//参数合法性检查
|
||||||
|
if (clientNetInfo == NULL || dom_name == NULL || var_name == NULL)
|
||||||
|
{
|
||||||
|
printf("[GETVAR] invalid arg netInfo=%p dom=%p var=%p\n",
|
||||||
|
clientNetInfo, dom_name, var_name);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
//结构体清零,避免未初始化字段导致异常
|
||||||
|
memset(&getvar_req, 0, sizeof(getvar_req));
|
||||||
|
|
||||||
/* Get the type of this "Oper" attribute & create type. */
|
/* Get the type of this "Oper" attribute & create type. */
|
||||||
/* Would be more efficient to do this just once before this function.*/
|
/* Would be more efficient to do this just once before this function.*/
|
||||||
getvar_req.req_tag = GETVAR_NAME;
|
getvar_req.req_tag = GETVAR_NAME;
|
||||||
@@ -284,18 +302,52 @@ ST_RET ret;
|
|||||||
getvar_req.name.domain_id= dom_name;
|
getvar_req.name.domain_id= dom_name;
|
||||||
getvar_req.name.obj_name.vmd_spec = var_name;
|
getvar_req.name.obj_name.vmd_spec = var_name;
|
||||||
|
|
||||||
|
//增加调试打印,确认崩溃点
|
||||||
|
printf("[GETVAR] start dom=%s var=%s\n", dom_name, var_name);
|
||||||
|
|
||||||
ret = mvla_getvar (clientNetInfo, &getvar_req, &reqCtrl);
|
ret = mvla_getvar (clientNetInfo, &getvar_req, &reqCtrl);
|
||||||
if (ret == SD_SUCCESS)
|
|
||||||
|
//打印 mvla_getvar 返回值和 reqCtrl
|
||||||
|
printf("[GETVAR] mvla_getvar ret=0x%X reqCtrl=%p\n", ret, reqCtrl);
|
||||||
|
|
||||||
|
if (ret == SD_SUCCESS){
|
||||||
ret = waitReqDone (reqCtrl, iTimeOut);
|
ret = waitReqDone (reqCtrl, iTimeOut);
|
||||||
if (ret != SD_SUCCESS)
|
//打印 waitReqDone 返回值
|
||||||
|
printf("[GETVAR] wait ret=0x%X\n", ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret != SD_SUCCESS){
|
||||||
echo_warn2 ("Error getting type of variable '%s' in domain '%s'\n", var_name, dom_name);
|
echo_warn2 ("Error getting type of variable '%s' in domain '%s'\n", var_name, dom_name);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* Don't care about name so pass NULL. */
|
/* Don't care about name so pass NULL. */
|
||||||
type_id = mvl_type_id_create (NULL, reqCtrl->u.getvar.resp_info->type_spec.data,
|
//type_id = mvl_type_id_create (NULL, reqCtrl->u.getvar.resp_info->type_spec.data,
|
||||||
|
// reqCtrl->u.getvar.resp_info->type_spec.len);
|
||||||
|
//严格检查 resp_info/type_spec,避免空指针崩溃
|
||||||
|
if (ret == SD_SUCCESS &&
|
||||||
|
reqCtrl != NULL &&
|
||||||
|
reqCtrl->u.getvar.resp_info != NULL &&
|
||||||
|
reqCtrl->u.getvar.resp_info->type_spec.data != NULL &&
|
||||||
|
reqCtrl->u.getvar.resp_info->type_spec.len > 0)
|
||||||
|
{
|
||||||
|
type_id = mvl_type_id_create(
|
||||||
|
NULL,
|
||||||
|
reqCtrl->u.getvar.resp_info->type_spec.data,
|
||||||
|
reqCtrl->u.getvar.resp_info->type_spec.len);
|
||||||
|
|
||||||
|
printf("[GETVAR] create type_id=%d len=%d\n",
|
||||||
|
type_id,
|
||||||
reqCtrl->u.getvar.resp_info->type_spec.len);
|
reqCtrl->u.getvar.resp_info->type_spec.len);
|
||||||
}
|
}
|
||||||
mvl_free_req_ctrl (reqCtrl); /* Done with request struct */
|
else
|
||||||
|
{
|
||||||
|
printf("[GETVAR] failed dom=%s var=%s ret=0x%X\n",
|
||||||
|
dom_name, var_name, ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//只有 reqCtrl 非空才释放
|
||||||
|
if (reqCtrl != NULL)mvl_free_req_ctrl (reqCtrl); /* Done with request struct */
|
||||||
return (type_id);
|
return (type_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1683,20 +1735,20 @@ static ST_VOID *my_realloc_err (ST_VOID *old, ST_UINT size)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////
|
||||||
//#define MAX_FILE_HANDLE_NUM (256)
|
#define MAX_FILE_HANDLE_NUM (256)
|
||||||
//static FILE *fp_arr[MAX_FILE_HANDLE_NUM];
|
static FILE *fp_arr[MAX_FILE_HANDLE_NUM];
|
||||||
//static ST_INT32 cur_handle = 0;
|
static ST_INT32 cur_handle = 0;
|
||||||
//ST_INT32 set_file_pointer( FILE *fp)
|
ST_INT32 set_file_pointer( FILE *fp)
|
||||||
//{
|
{
|
||||||
// ST_INT32 the_handle = cur_handle;
|
ST_INT32 the_handle = cur_handle;
|
||||||
// fp_arr[cur_handle++] = fp;
|
fp_arr[cur_handle++] = fp;
|
||||||
// cur_handle %= MAX_FILE_HANDLE_NUM;
|
cur_handle %= MAX_FILE_HANDLE_NUM;
|
||||||
// return the_handle;
|
return the_handle;
|
||||||
//}
|
}
|
||||||
//FILE* get_file_pointer(ST_INT32 handle)
|
FILE* get_file_pointer(ST_INT32 handle)
|
||||||
//{
|
{
|
||||||
// return fp_arr[handle];
|
return fp_arr[handle];
|
||||||
//}
|
}
|
||||||
///////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#if (MMS_FOPEN_EN & RESP_EN)
|
#if (MMS_FOPEN_EN & RESP_EN)
|
||||||
@@ -1705,39 +1757,39 @@ static ST_VOID *my_realloc_err (ST_VOID *old, ST_UINT size)
|
|||||||
/************************************************************************/
|
/************************************************************************/
|
||||||
ST_VOID u_mvl_fopen_ind (MVL_IND_PEND *indCtrl)
|
ST_VOID u_mvl_fopen_ind (MVL_IND_PEND *indCtrl)
|
||||||
{
|
{
|
||||||
//FILE *fp;
|
FILE *fp;
|
||||||
//FOPEN_RESP_INFO resp_info;
|
FOPEN_RESP_INFO resp_info;
|
||||||
//struct stat stat_buf;
|
struct stat stat_buf;
|
||||||
|
|
||||||
//fp = fopen (indCtrl->u.fopen.filename, "rb"); /* CRITICAL: use "b" flag for binary transfer*/
|
fp = fopen (indCtrl->u.fopen.filename, "rb"); /* CRITICAL: use "b" flag for binary transfer*/
|
||||||
//if (fp == NULL)
|
if (fp == NULL)
|
||||||
// {
|
{
|
||||||
// _mplas_err_resp (indCtrl,11,6); /* File-access denied */
|
_mplas_err_resp (indCtrl,11,6); /* File-access denied */
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//if (fseek (fp, indCtrl->u.fopen.init_pos, SEEK_SET))
|
if (fseek (fp, indCtrl->u.fopen.init_pos, SEEK_SET))
|
||||||
// {
|
{
|
||||||
// _mplas_err_resp (indCtrl,11,5); /* Position invalid */
|
_mplas_err_resp (indCtrl,11,5); /* Position invalid */
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
|
|
||||||
///* WARNING: this only works if (FILE *) is a 32-bit pointer. */
|
/* WARNING: this only works if (FILE *) is a 32-bit pointer. */
|
||||||
//resp_info.frsmid = set_file_pointer(fp); //(ST_INT32) fp;
|
resp_info.frsmid = set_file_pointer(fp); //(ST_INT32) fp;
|
||||||
|
|
||||||
//if (fstat (fileno (fp), &stat_buf))
|
if (fstat (fileno (fp), &stat_buf))
|
||||||
// { /* Can't get file size or time */
|
{ /* Can't get file size or time */
|
||||||
// _mplas_err_resp (indCtrl,11,0); /* File Problem, Other */
|
_mplas_err_resp (indCtrl,11,0); /* File Problem, Other */
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
//else
|
else
|
||||||
// {
|
{
|
||||||
// resp_info.ent.fsize = stat_buf.st_size;
|
resp_info.ent.fsize = stat_buf.st_size;
|
||||||
// resp_info.ent.mtimpres = SD_TRUE;
|
resp_info.ent.mtimpres = SD_TRUE;
|
||||||
// resp_info.ent.mtime = stat_buf.st_mtime;
|
resp_info.ent.mtime = stat_buf.st_mtime;
|
||||||
// }
|
}
|
||||||
|
|
||||||
//indCtrl->u.fopen.resp_info = &resp_info;
|
indCtrl->u.fopen.resp_info = &resp_info;
|
||||||
//mplas_fopen_resp (indCtrl);
|
mplas_fopen_resp (indCtrl);
|
||||||
}
|
}
|
||||||
#endif /* MMS_FOPEN_EN & RESP_EN */
|
#endif /* MMS_FOPEN_EN & RESP_EN */
|
||||||
|
|
||||||
@@ -1748,31 +1800,31 @@ ST_VOID u_mvl_fopen_ind (MVL_IND_PEND *indCtrl)
|
|||||||
/************************************************************************/
|
/************************************************************************/
|
||||||
ST_VOID u_mvl_fread_ind (MVL_IND_PEND *indCtrl)
|
ST_VOID u_mvl_fread_ind (MVL_IND_PEND *indCtrl)
|
||||||
{
|
{
|
||||||
//FILE *fp;
|
FILE *fp;
|
||||||
//ST_UCHAR *tmp_buf;
|
ST_UCHAR *tmp_buf;
|
||||||
//MVLAS_FREAD_CTRL *fread_ctrl = &indCtrl->u.fread;
|
MVLAS_FREAD_CTRL *fread_ctrl = &indCtrl->u.fread;
|
||||||
//FREAD_RESP_INFO resp_info;
|
FREAD_RESP_INFO resp_info;
|
||||||
|
|
||||||
//fp = get_file_pointer(fread_ctrl->req_info->frsmid);// (FILE *) fread_ctrl->req_info->frsmid;
|
fp = get_file_pointer(fread_ctrl->req_info->frsmid);// (FILE *) fread_ctrl->req_info->frsmid;
|
||||||
///* Do NOT read more than "max_size". */
|
/* Do NOT read more than "max_size". */
|
||||||
//tmp_buf = (ST_UCHAR *) chk_malloc (fread_ctrl->max_size);
|
tmp_buf = (ST_UCHAR *) chk_malloc (fread_ctrl->max_size);
|
||||||
|
|
||||||
//resp_info.fd_len = fread (tmp_buf, 1, fread_ctrl->max_size, fp);
|
resp_info.fd_len = fread (tmp_buf, 1, fread_ctrl->max_size, fp);
|
||||||
//if (resp_info.fd_len == 0 && ferror (fp))
|
if (resp_info.fd_len == 0 && ferror (fp))
|
||||||
// {
|
{
|
||||||
// _mplas_err_resp (indCtrl, 3, 0);
|
_mplas_err_resp (indCtrl, 3, 0);
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
|
|
||||||
//resp_info.filedata = tmp_buf;
|
resp_info.filedata = tmp_buf;
|
||||||
//if (resp_info.fd_len == fread_ctrl->max_size)
|
if (resp_info.fd_len == fread_ctrl->max_size)
|
||||||
// resp_info.more_follows = SD_TRUE;
|
resp_info.more_follows = SD_TRUE;
|
||||||
//else
|
else
|
||||||
// resp_info.more_follows = SD_FALSE;
|
resp_info.more_follows = SD_FALSE;
|
||||||
|
|
||||||
//fread_ctrl->resp_info = &resp_info;
|
fread_ctrl->resp_info = &resp_info;
|
||||||
//mplas_fread_resp (indCtrl);
|
mplas_fread_resp (indCtrl);
|
||||||
//chk_free (tmp_buf); /* Temporary buffer */
|
chk_free (tmp_buf); /* Temporary buffer */
|
||||||
}
|
}
|
||||||
#endif /* #if (MMS_FREAD_EN & RESP_EN) */
|
#endif /* #if (MMS_FREAD_EN & RESP_EN) */
|
||||||
|
|
||||||
@@ -1782,15 +1834,15 @@ ST_VOID u_mvl_fread_ind (MVL_IND_PEND *indCtrl)
|
|||||||
/************************************************************************/
|
/************************************************************************/
|
||||||
ST_VOID u_mvl_fclose_ind (MVL_IND_PEND *indCtrl)
|
ST_VOID u_mvl_fclose_ind (MVL_IND_PEND *indCtrl)
|
||||||
{
|
{
|
||||||
//FILE *fp;
|
FILE *fp;
|
||||||
//MVLAS_FCLOSE_CTRL *fclose_ctrl = &indCtrl->u.fclose;
|
MVLAS_FCLOSE_CTRL *fclose_ctrl = &indCtrl->u.fclose;
|
||||||
|
|
||||||
//fp = get_file_pointer(fclose_ctrl->req_info->frsmid);//(FILE *) fclose_ctrl->req_info->frsmid;
|
fp = get_file_pointer(fclose_ctrl->req_info->frsmid);//(FILE *) fclose_ctrl->req_info->frsmid;
|
||||||
|
|
||||||
//if (fclose (fp))
|
if (fclose (fp))
|
||||||
// _mplas_err_resp (indCtrl, 11, 0); /* File problem, other */
|
_mplas_err_resp (indCtrl, 11, 0); /* File problem, other */
|
||||||
//else
|
else
|
||||||
// mplas_fclose_resp (indCtrl);
|
mplas_fclose_resp (indCtrl);
|
||||||
}
|
}
|
||||||
#endif /* #if (MMS_FCLOSE_EN & RESP_EN) */
|
#endif /* #if (MMS_FCLOSE_EN & RESP_EN) */
|
||||||
|
|
||||||
@@ -1945,6 +1997,295 @@ MVL_REQ_PEND *reqCtrl;
|
|||||||
return (ret);
|
return (ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lnk20260508添加重启装置函数
|
||||||
|
int BuildResetDomName(ied_usr_t *ied_usr, char *domName, size_t domNameSize)
|
||||||
|
{
|
||||||
|
if (ied_usr == NULL || domName == NULL || domNameSize == 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
domName[0] = '\0';
|
||||||
|
|
||||||
|
XmlConfigC cfg1;
|
||||||
|
memset(&cfg1, 0, sizeof(cfg1));
|
||||||
|
|
||||||
|
if (get_xml_config_by_dev_type(ied_usr->dev_type, &cfg1))
|
||||||
|
{
|
||||||
|
printf("========== XmlConfigC dump ==========\n");
|
||||||
|
printf("IEDControl = '%s'\n", cfg1.IEDControl);
|
||||||
|
printf("IEDname = '%s'\n", cfg1.IEDname);
|
||||||
|
printf("LDevicePrefix = '%s'\n", cfg1.LDevicePrefix);
|
||||||
|
printf("=====================================\n");
|
||||||
|
|
||||||
|
if (cfg1.IEDControl[0] != '\0')
|
||||||
|
{
|
||||||
|
snprintf(domName, domNameSize, "%s", cfg1.IEDControl);
|
||||||
|
printf("[RESET] use cfg1.IEDControl=%s\n", domName);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ied_usr->LD_info && ied_usr->LD_info[0].LD_name)
|
||||||
|
{
|
||||||
|
snprintf(domName, domNameSize, "%s", ied_usr->LD_info[0].LD_name);
|
||||||
|
|
||||||
|
int len = strlen(domName);
|
||||||
|
if (len > 0 && isdigit(domName[len - 1]))
|
||||||
|
domName[len - 1] = '0';
|
||||||
|
|
||||||
|
printf("[RESET] use LD_name domName=%s\n", domName);
|
||||||
|
|
||||||
|
DIY_WARNLOG_CODE(ied_usr->terminal_id, 1, LOG_CODE_FILE_CONTROL,
|
||||||
|
"【WARN】未取到 IEDControl 信息,使用 LD_name=%s terminal_id=%s",
|
||||||
|
domName, ied_usr->terminal_id);
|
||||||
|
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(domName, domNameSize, "%s", "PQMonitorPQM0");
|
||||||
|
printf("[RESET] use default domName=%s\n", domName);
|
||||||
|
DIY_ERRORLOG_CODE(ied_usr->terminal_id, 1, LOG_CODE_FILE_CONTROL,
|
||||||
|
"【ERROR】未取到 LD 信息,使用默认 domName=%s terminal_id=%s",
|
||||||
|
domName, ied_usr->terminal_id);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ST_INT ledrs_var_type_create(MVL_NET_INFO* net_info,
|
||||||
|
OBJECT_NAME* varObj,
|
||||||
|
ST_INT timeOut)
|
||||||
|
{
|
||||||
|
MVL_REQ_PEND* reqCtrl;
|
||||||
|
GETVAR_REQ_INFO getvar_req;
|
||||||
|
VAR_ACC_TSPEC* type_spec;
|
||||||
|
ST_INT type_id = -1;
|
||||||
|
ST_RET ret;
|
||||||
|
|
||||||
|
memset(&getvar_req, 0, sizeof(getvar_req));
|
||||||
|
|
||||||
|
getvar_req.req_tag = GETVAR_NAME;
|
||||||
|
getvar_req.name = *varObj;
|
||||||
|
|
||||||
|
ret = mvla_getvar(net_info, &getvar_req, &reqCtrl);
|
||||||
|
|
||||||
|
if (ret == SD_SUCCESS)
|
||||||
|
ret = waitReqDone(reqCtrl, timeOut);
|
||||||
|
|
||||||
|
if (ret == SD_SUCCESS)
|
||||||
|
{
|
||||||
|
type_spec = &reqCtrl->u.getvar.resp_info->type_spec;
|
||||||
|
|
||||||
|
type_id = mvl_type_id_create(NULL,
|
||||||
|
type_spec->data,
|
||||||
|
type_spec->len);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reqCtrl)
|
||||||
|
mvl_free_req_ctrl(reqCtrl);
|
||||||
|
|
||||||
|
return type_id;
|
||||||
|
}
|
||||||
|
static ST_INT create_oper_type_id(MVL_NET_INFO *net_info,
|
||||||
|
ST_CHAR *domName,
|
||||||
|
const char *ctlName,
|
||||||
|
ST_INT timeOut)
|
||||||
|
{
|
||||||
|
OBJECT_NAME obj;
|
||||||
|
ST_CHAR varName[MAX_IDENT_LEN + 1];
|
||||||
|
|
||||||
|
if (net_info == NULL || domName == NULL || ctlName == NULL)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
memset(&obj, 0, sizeof(obj));
|
||||||
|
|
||||||
|
apr_snprintf(varName,
|
||||||
|
sizeof(varName),
|
||||||
|
"LLN0$%s",
|
||||||
|
ctlName);
|
||||||
|
|
||||||
|
obj.object_tag = DOM_SPEC;
|
||||||
|
obj.domain_id = domName;
|
||||||
|
obj.obj_name.vmd_spec = varName;
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] create type dom=%s var=%s\n",
|
||||||
|
domName, varName);
|
||||||
|
|
||||||
|
return ledrs_var_type_create(net_info, &obj, timeOut);
|
||||||
|
}
|
||||||
|
void InitLedrsOperTypeForChannel(chnl_usr_t *chnl_usr)
|
||||||
|
{
|
||||||
|
printf("[CTRL_INIT] enter\n");
|
||||||
|
|
||||||
|
if (chnl_usr == NULL || chnl_usr->chnl == NULL ||
|
||||||
|
chnl_usr->chnl->ied == NULL || chnl_usr->net_info == NULL)
|
||||||
|
{
|
||||||
|
printf("[CTRL_INIT] invalid chnl_usr\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ied_t *ied = chnl_usr->chnl->ied;
|
||||||
|
ied_usr_t *ied_usr = GET_IEDEXT_ADDR(ied);
|
||||||
|
|
||||||
|
if (ied_usr == NULL || ied_usr->LD_info == NULL)
|
||||||
|
{
|
||||||
|
printf("[CTRL_INIT] invalid ied_usr or LD_info\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] current inited=%d ledrs=%d reboot=%d reset=%d\n",
|
||||||
|
(int)ied_usr->oper_type_cache.inited,
|
||||||
|
ied_usr->oper_type_cache.ledrs_oper_type_id,
|
||||||
|
ied_usr->oper_type_cache.reboot_oper_type_id,
|
||||||
|
ied_usr->oper_type_cache.reset_oper_type_id);
|
||||||
|
|
||||||
|
if (ied_usr->oper_type_cache.inited == SD_TRUE)
|
||||||
|
{
|
||||||
|
printf("[CTRL_INIT] already inited\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char domName[256] = {0};
|
||||||
|
|
||||||
|
BuildResetDomName(ied_usr,
|
||||||
|
domName,
|
||||||
|
sizeof(domName));
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] final dom=%s\n", domName);
|
||||||
|
|
||||||
|
ied_usr->oper_type_cache.ledrs_oper_type_id =
|
||||||
|
create_oper_type_id(chnl_usr->net_info,
|
||||||
|
domName,
|
||||||
|
"CO$LEDRs$Oper",
|
||||||
|
g_pt61850app->mmsOpTimeout);
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] LEDRs type_id=%d\n",
|
||||||
|
ied_usr->oper_type_cache.ledrs_oper_type_id);
|
||||||
|
|
||||||
|
ied_usr->oper_type_cache.reboot_oper_type_id =
|
||||||
|
create_oper_type_id(chnl_usr->net_info,
|
||||||
|
domName,
|
||||||
|
"CO$Reboot$Oper",
|
||||||
|
g_pt61850app->mmsOpTimeout);
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] Reboot type_id=%d\n",
|
||||||
|
ied_usr->oper_type_cache.reboot_oper_type_id);
|
||||||
|
|
||||||
|
ied_usr->oper_type_cache.reset_oper_type_id =
|
||||||
|
create_oper_type_id(chnl_usr->net_info,
|
||||||
|
domName,
|
||||||
|
"ST$Mod$stVal",
|
||||||
|
g_pt61850app->mmsOpTimeout);
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] Reset type_id=%d\n",
|
||||||
|
ied_usr->oper_type_cache.reset_oper_type_id);
|
||||||
|
|
||||||
|
/* 无论成功失败,都不再重复初始化 */
|
||||||
|
ied_usr->oper_type_cache.inited = SD_TRUE;
|
||||||
|
|
||||||
|
printf("[CTRL_INIT] finish inited=%d\n",
|
||||||
|
(int)ied_usr->oper_type_cache.inited);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ST_RET write_common_oper(chnl_usr_t *chnl_usr,
|
||||||
|
ST_CHAR *domName,
|
||||||
|
const char *ctlName,
|
||||||
|
ST_INT oper_type_id,
|
||||||
|
ST_INT timeOut)
|
||||||
|
{
|
||||||
|
if (chnl_usr == NULL ||
|
||||||
|
chnl_usr->net_info == NULL ||
|
||||||
|
domName == NULL ||
|
||||||
|
ctlName == NULL ||
|
||||||
|
oper_type_id < 0)
|
||||||
|
{
|
||||||
|
printf("[OPER_WRITE] invalid param\n");
|
||||||
|
return SD_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
ST_CHAR varName[MAX_IDENT_LEN + 1];
|
||||||
|
apr_snprintf(varName,
|
||||||
|
sizeof(varName),
|
||||||
|
"LLN0$%s",
|
||||||
|
ctlName);
|
||||||
|
|
||||||
|
Control_Oper_t oper;
|
||||||
|
memset(&oper, 0, sizeof(oper));
|
||||||
|
|
||||||
|
oper.ctlVal = SD_TRUE;
|
||||||
|
oper.origin.orCat = 3;
|
||||||
|
oper.origin.orIdent.len = 0;
|
||||||
|
oper.ctlNum = 1;
|
||||||
|
u_get_current_utc_time(&oper.T);
|
||||||
|
oper.Test = SD_FALSE;
|
||||||
|
oper.Check[0] = 0x00;
|
||||||
|
oper.Check[1] = 0x00;
|
||||||
|
|
||||||
|
if ((int)sizeof(Control_Oper_t) !=
|
||||||
|
mvl_type_ctrl[oper_type_id].data_size)
|
||||||
|
{
|
||||||
|
printf("[OPER_WRITE] SIZE MISMATCH ctl=%s local=%d runtime=%d\n",
|
||||||
|
ctlName,
|
||||||
|
(int)sizeof(Control_Oper_t),
|
||||||
|
mvl_type_ctrl[oper_type_id].data_size);
|
||||||
|
return SD_FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("[OPER_WRITE] dom=%s var=%s type_id=%d\n",
|
||||||
|
domName, varName, oper_type_id);
|
||||||
|
|
||||||
|
return mms_named_var_write(chnl_usr->net_info,
|
||||||
|
varName,
|
||||||
|
DOM_SPEC,
|
||||||
|
domName,
|
||||||
|
oper_type_id,
|
||||||
|
(ST_CHAR *)&oper,
|
||||||
|
timeOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
ST_RET mms_conclude_disconnect(MVL_NET_INFO *net_info, ST_INT timeOut)
|
||||||
|
{
|
||||||
|
MVL_REQ_PEND *reqCtrl = NULL;
|
||||||
|
ST_RET ret;
|
||||||
|
|
||||||
|
if (net_info == NULL)
|
||||||
|
return SD_FAILURE;
|
||||||
|
|
||||||
|
printf("[RESET] before mvl_concl\n");
|
||||||
|
|
||||||
|
ret = mvl_concl(net_info, &reqCtrl);
|
||||||
|
|
||||||
|
printf("[RESET] after mvl_concl ret=0x%X reqCtrl=%p\n",
|
||||||
|
ret, reqCtrl);
|
||||||
|
|
||||||
|
if (ret == SD_SUCCESS && reqCtrl != NULL)
|
||||||
|
{
|
||||||
|
ret = waitReqDone(reqCtrl, timeOut);
|
||||||
|
|
||||||
|
printf("[RESET] conclude wait ret=0x%X\n", ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reqCtrl != NULL)
|
||||||
|
mvl_free_req_ctrl(reqCtrl);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
ST_RET write_mod_stval(chnl_usr_t *chnl_usr,
|
||||||
|
ST_CHAR *domName,
|
||||||
|
ST_INT timeOut)
|
||||||
|
{
|
||||||
|
ST_CHAR varName[MAX_IDENT_LEN + 1];
|
||||||
|
ST_INT16 value = 88;
|
||||||
|
|
||||||
|
apr_snprintf(varName, sizeof(varName), "LLN0$ST$Mod$stVal");
|
||||||
|
|
||||||
|
return mms_named_var_write(chnl_usr->net_info,
|
||||||
|
varName,
|
||||||
|
DOM_SPEC,
|
||||||
|
domName,
|
||||||
|
14, //int
|
||||||
|
(ST_CHAR *)&value,
|
||||||
|
timeOut);
|
||||||
|
}
|
||||||
|
|
||||||
/************************************************************************/
|
/************************************************************************/
|
||||||
/* init_log_cfg */
|
/* init_log_cfg */
|
||||||
|
|||||||
@@ -320,6 +320,17 @@ struct LD_info_t{
|
|||||||
//录波
|
//录波
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//装置控制初始化
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
ST_BOOLEAN inited;
|
||||||
|
|
||||||
|
ST_INT ledrs_oper_type_id;
|
||||||
|
ST_INT reboot_oper_type_id;
|
||||||
|
ST_INT reset_oper_type_id;
|
||||||
|
|
||||||
|
} MMS_OPER_TYPE_CACHE;
|
||||||
|
|
||||||
struct ied_usr_t{
|
struct ied_usr_t{
|
||||||
LD_info_t *LD_info; /**< LD数组 */
|
LD_info_t *LD_info; /**< LD数组 */
|
||||||
int dev_idx; /**< 设备序号 */
|
int dev_idx; /**< 设备序号 */
|
||||||
@@ -346,6 +357,8 @@ struct ied_usr_t{
|
|||||||
|
|
||||||
bool lastconnectstat;//lnk20250704
|
bool lastconnectstat;//lnk20250704
|
||||||
bool has_logged_disconnect;//lnk20250704
|
bool has_logged_disconnect;//lnk20250704
|
||||||
|
|
||||||
|
MMS_OPER_TYPE_CACHE oper_type_cache;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@@ -533,6 +546,35 @@ int parse_file_names_by_fltnum(int fltnum, char* domname, char** filenames, int
|
|||||||
QVVR_t* find_qvvr_by_trig_tm(LD_info_t* LD_info,long long trig_tm);
|
QVVR_t* find_qvvr_by_trig_tm(LD_info_t* LD_info,long long trig_tm);
|
||||||
|
|
||||||
void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr);
|
void HandleFileDirReqForChannel(chnl_usr_t *chnl_usr);
|
||||||
|
void InitLedrsOperTypeForChannel(chnl_usr_t *chnl_usr);
|
||||||
|
|
||||||
|
//lnk20250508添加重启装置函数
|
||||||
|
//根据抓包显示oper的data结构有6个item
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
ST_BOOLEAN ctlVal;
|
||||||
|
|
||||||
|
struct
|
||||||
|
{
|
||||||
|
ST_INT16 orCat;
|
||||||
|
|
||||||
|
struct
|
||||||
|
{
|
||||||
|
ST_INT16 len;
|
||||||
|
ST_UINT8 data[64];
|
||||||
|
} orIdent;
|
||||||
|
|
||||||
|
} origin;
|
||||||
|
|
||||||
|
ST_UINT32 ctlNum;
|
||||||
|
|
||||||
|
MMS_UTC_TIME T;
|
||||||
|
|
||||||
|
ST_BOOLEAN Test;
|
||||||
|
|
||||||
|
ST_UCHAR Check[2];
|
||||||
|
|
||||||
|
} Control_Oper_t;
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|||||||
28
mykafka.ini
28
mykafka.ini
@@ -74,7 +74,7 @@ FileFlag=4
|
|||||||
FrontInst=884d132ac3a01225fcacc8c10da07d09
|
FrontInst=884d132ac3a01225fcacc8c10da07d09
|
||||||
FrontIP=192.168.1.167
|
FrontIP=192.168.1.167
|
||||||
SendFlag=3
|
SendFlag=3
|
||||||
RecallOnlyFlag=
|
RecallOnlyFlag=0
|
||||||
|
|
||||||
[Ledger]
|
[Ledger]
|
||||||
TerminalStatus="[0]"
|
TerminalStatus="[0]"
|
||||||
@@ -117,37 +117,43 @@ WriteUrl=
|
|||||||
[RocketMq]
|
[RocketMq]
|
||||||
producer=Group_producer
|
producer=Group_producer
|
||||||
Ipport=192.168.1.68:9876
|
Ipport=192.168.1.68:9876
|
||||||
Topic=TEST_Topic
|
TESTTopic=TEST_Topic
|
||||||
Tag=Test_Tag
|
TESTTag=884d132ac3a01225fcacc8c10da07d09
|
||||||
Key=Test_Keys
|
TESTKey=Test_Keys
|
||||||
Queuenum=4
|
Queuenum=4
|
||||||
|
|
||||||
Testflag=1
|
Testflag=1
|
||||||
Testnum=100
|
Testnum=0
|
||||||
Testtype=1
|
Testtype=0
|
||||||
TestPort=11000
|
TestPort=11000
|
||||||
TestList=
|
TestList=
|
||||||
|
|
||||||
consumer=Group_consumer
|
consumer=Group_consumer
|
||||||
ConsumerIpport=192.168.1.68:9876
|
ConsumerIpport=192.168.1.68:9876
|
||||||
ConsumerTopicRT=ask_real_data_topic
|
ConsumerTopicRT=ask_real_data_topic
|
||||||
ConsumerTagRT=Test_Tag
|
ConsumerTagRT=884d132ac3a01225fcacc8c10da07d09
|
||||||
ConsumerKeyRT=Test_Keys
|
ConsumerKeyRT=Test_Keys
|
||||||
ConsumerAccessKey=rmqroot
|
ConsumerAccessKey=rmqroot
|
||||||
ConsumerSecretKey=001@#njcnmq
|
ConsumerSecretKey=001@#njcnmq
|
||||||
ConsumerChannel=
|
ConsumerChannel=
|
||||||
ConsumerTopicUD=control_Topic
|
ConsumerTopicUD=control_Topic
|
||||||
ConsumerTagUD=Test_Tag
|
ConsumerTagUD=884d132ac3a01225fcacc8c10da07d09
|
||||||
ConsumerKeyUD=Test_Keys
|
ConsumerKeyUD=Test_Keys
|
||||||
ConsumerTopicRC=recall_Topic
|
ConsumerTopicRC=recall_Topic
|
||||||
ConsumerTagRC=Test_Tag
|
ConsumerTagRC=884d132ac3a01225fcacc8c10da07d09
|
||||||
ConsumerKeyRC=Test_Keys
|
ConsumerKeyRC=Test_Keys
|
||||||
ConsumerTopicSET=process_Topic
|
ConsumerTopicSET=process_Topic
|
||||||
ConsumerTagSET=Test_Tag
|
ConsumerTagSET=884d132ac3a01225fcacc8c10da07d09
|
||||||
ConsumerKeySET=Test_Keys
|
ConsumerKeySET=Test_Keys
|
||||||
ConsumerTopicLOG=ask_log_Topic
|
ConsumerTopicLOG=ask_log_Topic
|
||||||
ConsumerTagLOG=Test_Tag
|
ConsumerTagLOG=884d132ac3a01225fcacc8c10da07d09
|
||||||
ConsumerKeyLOG=Test_Keys
|
ConsumerKeyLOG=Test_Keys
|
||||||
|
ConsumerTopicFILE=File_Topic
|
||||||
|
ConsumerTagFILE=884d132ac3a01225fcacc8c10da07d09
|
||||||
|
ConsumerKeyFILE=Test_Keys
|
||||||
|
|
||||||
|
ConsumerTopicTEST=File_Topic
|
||||||
|
|
||||||
LOGTopic=log_Topic
|
LOGTopic=log_Topic
|
||||||
LOGTag=Test_Tag
|
LOGTag=Test_Tag
|
||||||
LOGKey=Test_Keys
|
LOGKey=Test_Keys
|
||||||
|
|||||||
@@ -1,21 +1,34 @@
|
|||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
#include "../json/mms_json_inter.h"
|
#include "../json/mms_json_inter.h"
|
||||||
#include "../rocketmq/CProducer.h"
|
//#include "../rocketmq/CProducer.h"
|
||||||
#include "../rocketmq/CMessage.h"
|
//#include "../rocketmq/CMessage.h"
|
||||||
#include "../rocketmq/CSendResult.h"
|
//#include "../rocketmq/CSendResult.h"
|
||||||
|
//#include "../rocketmq/CPushConsumer.h"
|
||||||
#include "../rocketmq/CPushConsumer.h"
|
#include "../rocketmq/DefaultMQProducer.h"
|
||||||
|
#include "../rocketmq/MQMessage.h"
|
||||||
|
#include "../rocketmq/SendResult.h"
|
||||||
|
#include "../rocketmq/SessionCredentials.h"
|
||||||
|
#include "../rocketmq/MQMessageExt.h"
|
||||||
|
#include "../rocketmq/ConsumeType.h"
|
||||||
|
#include "../rocketmq/MQMessageListener.h"
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
using namespace rocketmq;
|
||||||
|
|
||||||
/*添加测试函数lnk10-10*/
|
/*添加测试函数lnk10-10*/
|
||||||
void producer_send0();
|
//void producer_send0();
|
||||||
void StartSendMessage(CProducer* producer,const char* strbody);
|
//void StartSendMessage(CProducer* producer,const char* strbody);
|
||||||
void producer_send(const char* strbody);
|
//void producer_send(const char* strbody);
|
||||||
void rocketmq_producer_send(const char* strbody,const char* topic);
|
//void rocketmq_producer_send(const char* strbody,const char* topic);
|
||||||
void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
|
//void rocketmq_StartSendMessage(CProducer* producer,const char* strbody,const char* topic);
|
||||||
|
void rocketmq_producer_send(const std::string& body,
|
||||||
|
const std::string& topic,
|
||||||
|
const std::string& tags,
|
||||||
|
const std::string& keys);
|
||||||
extern "C" {
|
extern "C" {
|
||||||
void rocketmq_test_rt();
|
void rocketmq_test_rt();
|
||||||
void rocketmq_test_ud();
|
void rocketmq_test_ud();
|
||||||
@@ -32,17 +45,25 @@ extern void my_rocketmq_send(Ckafka_data_t& data);
|
|||||||
void InitializeProducer();
|
void InitializeProducer();
|
||||||
void ShutdownAndDestroyProducer();
|
void ShutdownAndDestroyProducer();
|
||||||
//////////////////////////////////////////////////////消费者
|
//////////////////////////////////////////////////////消费者
|
||||||
void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
|
typedef ConsumeStatus (*MessageCallBack)(
|
||||||
void ShutdownAndDestroyConsumer();
|
const MQMessageExt& msg
|
||||||
|
);
|
||||||
|
|
||||||
struct Subscription {
|
struct Subscription {
|
||||||
std::string topic;
|
std::string topic;
|
||||||
std::string tag;
|
std::string tag;
|
||||||
MessageCallBack callback;
|
MessageCallBack callback;
|
||||||
|
|
||||||
Subscription(const std::string& t, const std::string& tg, MessageCallBack cb)
|
Subscription(const std::string& t,
|
||||||
: topic(t), tag(tg), callback(cb) {std::cout << "Subscription topic: " << topic << std::endl;}
|
const std::string& tg,
|
||||||
|
MessageCallBack cb)
|
||||||
|
: topic(t), tag(tg), callback(cb) {}
|
||||||
};
|
};
|
||||||
|
//void InitializeConsumer(const std::string& consumerName, const std::string& nameServer, const char* topic, const char* tag, const std::string& key);
|
||||||
|
void InitializeConsumer(const std::string& consumerName,
|
||||||
|
const std::string& nameServer,
|
||||||
|
const std::vector<Subscription>& subscriptions);
|
||||||
|
void ShutdownAndDestroyConsumer();
|
||||||
|
|
||||||
void rocketmq_consumer_receive(
|
void rocketmq_consumer_receive(
|
||||||
const std::string& consumerName,
|
const std::string& consumerName,
|
||||||
|
|||||||
Reference in New Issue
Block a user