add rtdata idx
This commit is contained in:
@@ -58,6 +58,9 @@ std::list<queue_data_t> queue_data_list;
|
||||
|
||||
static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者
|
||||
|
||||
std::mutex devidx_lock;
|
||||
std::unordered_map<std::string, int> devIdxMap;//实时数据用的idx
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
//前置进程
|
||||
@@ -343,7 +346,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer)
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理
|
||||
|
||||
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit){
|
||||
bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& Idx){
|
||||
json root;
|
||||
try {
|
||||
root = json::parse(body);
|
||||
@@ -378,7 +381,8 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
|
||||
!messageBody.contains("line") ||
|
||||
!messageBody.contains("realData") ||
|
||||
!messageBody.contains("soeData") ||
|
||||
!messageBody.contains("limit"))
|
||||
!messageBody.contains("limit")||
|
||||
!messageBody.contains("Idx"))
|
||||
{
|
||||
std::cerr << "Missing expected fields in 'messageBody'." << std::endl;
|
||||
return false;
|
||||
@@ -390,6 +394,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l
|
||||
realData = messageBody["realData"].get<bool>();
|
||||
soeData = messageBody["soeData"].get<bool>();
|
||||
limit = messageBody["limit"].get<int>();
|
||||
int idx = messageBody["Idx"].get<int>();
|
||||
} catch (const std::exception& e) {
|
||||
std::cerr << "Type error while extracting fields: " << e.what() << std::endl;
|
||||
return false;
|
||||
@@ -679,21 +684,22 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
|
||||
json_data.terminal_id = item.value("id", "");
|
||||
json_data.terminal_name = item.value("name", "");
|
||||
json_data.org_name = item.value("org_name", "");
|
||||
json_data.maint_name = item.value("maint_name", "");
|
||||
json_data.station_name = item.value("stationName", "");
|
||||
json_data.tmnl_factory = item.value("manufacturer", "");
|
||||
json_data.tmnl_status = item.value("status", "");
|
||||
//json_data.org_name = item.value("org_name", "");
|
||||
//json_data.maint_name = item.value("maint_name", "");
|
||||
//json_data.station_name = item.value("stationName", "");
|
||||
//json_data.tmnl_factory = item.value("manufacturer", "");
|
||||
//json_data.tmnl_status = item.value("status", "");
|
||||
json_data.dev_type = item.value("devType", "");
|
||||
json_data.dev_key = item.value("devKey", "");
|
||||
json_data.dev_series = item.value("series", "");
|
||||
//json_data.dev_key = item.value("devKey", "");
|
||||
//json_data.dev_series = item.value("series", "");
|
||||
|
||||
int procNo = item.value("processNo", -1);
|
||||
json_data.processNo = std::to_string(procNo);
|
||||
|
||||
json_data.addr_str = item.value("ip", "");
|
||||
json_data.port = item.value("port", "");
|
||||
json_data.timestamp = item.value("updateTime", "");
|
||||
//json_data.addr_str = item.value("ip", "");
|
||||
//json_data.port = item.value("port", "");
|
||||
//json_data.timestamp = item.value("updateTime", "");
|
||||
json_data.Righttime = item.value("Righttime", "");
|
||||
|
||||
if (item.contains("monitorData") && item["monitorData"].is_array()) {
|
||||
int j = 0;
|
||||
@@ -706,7 +712,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d
|
||||
m.status = monitor_item.value("status", "");
|
||||
m.logical_device_seq = monitor_item.value("lineNo", "");
|
||||
m.terminal_connect = monitor_item.value("ptType", "");
|
||||
m.timestamp = json_data.timestamp;
|
||||
//m.timestamp = json_data.timestamp;
|
||||
m.terminal_id = json_data.terminal_id;
|
||||
}
|
||||
}
|
||||
@@ -777,8 +783,9 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
|
||||
ushort line;
|
||||
bool realData = false, soeData = false;
|
||||
int limit = 0;
|
||||
int idx = 0;
|
||||
|
||||
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
|
||||
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit,idx)) {
|
||||
std::cerr << "Failed to parse the JSON message." << std::endl;
|
||||
DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str());
|
||||
return rocketmq::RECONSUME_LATER;
|
||||
@@ -794,10 +801,13 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms
|
||||
if (ClientManager::instance().get_dev_status(devid) != 1) {
|
||||
std::cout << "devid对应装置不在线: " << devid << std::endl;
|
||||
// 记录日志不响应 web 端
|
||||
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str());
|
||||
DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的实时数据触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str());
|
||||
return rocketmq::CONSUME_SUCCESS;
|
||||
}
|
||||
|
||||
//记录idx
|
||||
devidx_set(devid, idx);//每次下发都会更新,不加入运行用的结构体
|
||||
|
||||
ClientManager::instance().set_real_state_count(devid, 60, line);//一秒询问一次,询问60次,下一次同一个测点调用的话就会刷新
|
||||
}
|
||||
else{
|
||||
|
||||
Reference in New Issue
Block a user