diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index c34987e..2f05ca1 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -2285,7 +2285,7 @@ void print_terminal(const terminal_dev& tmnl) { std::cout << "Address: " << safe(tmnl.addr_str) << "\n"; std::cout << "Port: " << safe(tmnl.port) << "\n"; std::cout << "Timestamp: " << safe(tmnl.timestamp) << "\n"; - + std::cout << "Righttime: " << safe(tmnl.Righttime) << "\n"; std::cout << "mac: " << safe(tmnl.mac) << "\n"; for (size_t i = 0; i < 10 && !tmnl.line[i].monitor_id.empty(); ++i) { diff --git a/LFtid1056/cloudfront/code/interface.cpp b/LFtid1056/cloudfront/code/interface.cpp index 57b5bb4..87546d1 100644 --- a/LFtid1056/cloudfront/code/interface.cpp +++ b/LFtid1056/cloudfront/code/interface.cpp @@ -635,8 +635,9 @@ int terminal_ledger_web(std::map& terminal_dev_map, //dev.dev_series = safe_str(item, "series"); //dev.port = safe_str(item, "port"); //dev.timestamp = safe_str(item, "updateTime"); + dev.Righttime = safe_str(item, "Righttime"); dev.processNo = safe_str(item, "node"); - //dev.maxProcessNum = safe_str(item, "maxProcessNum"); + dev.maxProcessNum = safe_str(item, "maxProcessNum"); //dev.mac = safe_str(item, "mac");//添加mac @@ -650,7 +651,7 @@ int terminal_ledger_web(std::map& terminal_dev_map, m.logical_device_seq = safe_str(mon, "lineNo"); m.voltage_level = safe_str(mon, "voltageLevel"); m.terminal_connect = safe_str(mon, "ptType"); - m.timestamp = safe_str(mon, "updateTime"); + //m.timestamp = safe_str(mon, "updateTime"); m.status = safe_str(mon, "status"); m.CT1 = mon.value("ct1", 0.0); diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index ab7a817..dd5aa4b 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -235,7 +235,8 @@ public: std::string dev_series; std::string addr_str; //装置ip std::string port; //装置端口 - std::string timestamp; + std::string timestamp; //更新时间 + std::string Righttime; //对时 std::string processNo; std::string maxProcessNum; @@ -710,6 +711,25 @@ inline std::string now_yyyy_mm_dd_hh_mm_ss() { oss << std::put_time(&tmv, "%Y-%m-%d %H:%M:%S"); return oss.str(); } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////实时数据用 +// === 专用锁 + 数据表(仅管实时 idx 映射) === +extern std::mutex devidx_lock; // 新锁(不要用 ledgermtx) +extern std::unordered_map devIdxMap; // id -> idx,一对一 + +// === 常用操作:全部用这把锁保护 === +inline void devidx_set(const std::string& id, int idx) { + std::lock_guard lk(devidx_lock); + devIdxMap[id] = idx; // 覆盖更新 +} + +inline bool devidx_get(const std::string& id, int& out_idx) { + std::lock_guard lk(devidx_lock); + auto it = devIdxMap.find(id); + if (it == devIdxMap.end()) return false; + out_idx = it->second; + return true; +} ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// extern int g_front_seg_index; extern std::string FRONT_INST; diff --git a/LFtid1056/cloudfront/code/log4.cpp b/LFtid1056/cloudfront/code/log4.cpp index 7b3453b..bd4567d 100644 --- a/LFtid1056/cloudfront/code/log4.cpp +++ b/LFtid1056/cloudfront/code/log4.cpp @@ -324,7 +324,7 @@ void init_loggers_bydevid(const std::string& dev_id) Logger device_logger = init_logger(device_key, device_dir, dev_id, device_appender); logger_map[device_key] = TypedLogger(device_logger, LOGTYPE_DATA); - DIY_WARNLOG(dev_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str()); + //DIY_WARNLOG(dev_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str()); } // 初始化监测点日志,monitor..COM / .DATA @@ -349,7 +349,7 @@ void init_loggers_bydevid(const std::string& dev_id) Logger mon_logger = init_logger(mon_key.str(), mon_path.str(), mon_name.str(), monitor_appender); logger_map[mon_key.str()] = TypedLogger(mon_logger, LOGTYPE_DATA); - DIY_WARNLOG(monitor.monitor_id.c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str()); + //DIY_WARNLOG(monitor.monitor_id.c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str()); } } } @@ -389,7 +389,7 @@ void init_loggers() logger_map[device_key] = TypedLogger(device_logger, LOGTYPE_DATA); - DIY_WARNLOG(term.terminal_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str()); + //DIY_WARNLOG(term.terminal_id.c_str(), "【WARN】终端id:%s终端级日志初始化完毕", term.terminal_id.c_str()); // 初始化监测点日志 for (size_t i = 0; i < term.line.size(); ++i) { @@ -412,8 +412,8 @@ void init_loggers() logger_map[mon_key.str()] = TypedLogger(mon_logger, LOGTYPE_DATA); - DIY_WARNLOG(mon_key.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", - monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str()); + //DIY_WARNLOG(mon_key.str().c_str(), "【WARN】监测点:%s - id:%s监测点级日志初始化完毕", + //monitor.monitor_name.c_str(), monitor.logical_device_seq.c_str()); } } } diff --git a/LFtid1056/cloudfront/code/rocketmq.cpp b/LFtid1056/cloudfront/code/rocketmq.cpp index 9ac2037..e4b28e6 100644 --- a/LFtid1056/cloudfront/code/rocketmq.cpp +++ b/LFtid1056/cloudfront/code/rocketmq.cpp @@ -58,6 +58,9 @@ std::list queue_data_list; static rocketmq::RocketMQProducer* g_producer = nullptr; //生产者 +std::mutex devidx_lock; +std::unordered_map devIdxMap;//实时数据用的idx + /////////////////////////////////////////////////////////////////////////////////////////////////////////// //前置进程 @@ -343,7 +346,7 @@ void my_rocketmq_send(queue_data_t& data,rocketmq::RocketMQProducer* producer) /////////////////////////////////////////////////////////////////////////////////////////////////回调函数的json处理 -bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit){ +bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& line,bool& realData,bool& soeData,int& limit,int& Idx){ json root; try { root = json::parse(body); @@ -378,7 +381,8 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l !messageBody.contains("line") || !messageBody.contains("realData") || !messageBody.contains("soeData") || - !messageBody.contains("limit")) + !messageBody.contains("limit")|| + !messageBody.contains("Idx")) { std::cerr << "Missing expected fields in 'messageBody'." << std::endl; return false; @@ -390,6 +394,7 @@ bool parseJsonMessageRT(const std::string& body,std::string& devSeries,ushort& l realData = messageBody["realData"].get(); soeData = messageBody["soeData"].get(); limit = messageBody["limit"].get(); + int idx = messageBody["Idx"].get(); } catch (const std::exception& e) { std::cerr << "Type error while extracting fields: " << e.what() << std::endl; return false; @@ -679,21 +684,22 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d json_data.terminal_id = item.value("id", ""); json_data.terminal_name = item.value("name", ""); - json_data.org_name = item.value("org_name", ""); - json_data.maint_name = item.value("maint_name", ""); - json_data.station_name = item.value("stationName", ""); - json_data.tmnl_factory = item.value("manufacturer", ""); - json_data.tmnl_status = item.value("status", ""); + //json_data.org_name = item.value("org_name", ""); + //json_data.maint_name = item.value("maint_name", ""); + //json_data.station_name = item.value("stationName", ""); + //json_data.tmnl_factory = item.value("manufacturer", ""); + //json_data.tmnl_status = item.value("status", ""); json_data.dev_type = item.value("devType", ""); - json_data.dev_key = item.value("devKey", ""); - json_data.dev_series = item.value("series", ""); + //json_data.dev_key = item.value("devKey", ""); + //json_data.dev_series = item.value("series", ""); int procNo = item.value("processNo", -1); json_data.processNo = std::to_string(procNo); - json_data.addr_str = item.value("ip", ""); - json_data.port = item.value("port", ""); - json_data.timestamp = item.value("updateTime", ""); + //json_data.addr_str = item.value("ip", ""); + //json_data.port = item.value("port", ""); + //json_data.timestamp = item.value("updateTime", ""); + json_data.Righttime = item.value("Righttime", ""); if (item.contains("monitorData") && item["monitorData"].is_array()) { int j = 0; @@ -706,7 +712,7 @@ bool parseJsonMessageUD(const std::string& json_str, const std::string& output_d m.status = monitor_item.value("status", ""); m.logical_device_seq = monitor_item.value("lineNo", ""); m.terminal_connect = monitor_item.value("ptType", ""); - m.timestamp = json_data.timestamp; + //m.timestamp = json_data.timestamp; m.terminal_id = json_data.terminal_id; } } @@ -777,8 +783,9 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms ushort line; bool realData = false, soeData = false; int limit = 0; + int idx = 0; - if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) { + if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit,idx)) { std::cerr << "Failed to parse the JSON message." << std::endl; DIY_ERRORLOG("process", "【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确", FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str()); return rocketmq::RECONSUME_LATER; @@ -794,10 +801,13 @@ rocketmq::ConsumeStatus myMessageCallbackrtdata(const rocketmq::MQMessageExt& ms if (ClientManager::instance().get_dev_status(devid) != 1) { std::cout << "devid对应装置不在线: " << devid << std::endl; // 记录日志不响应 web 端 - DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的补招触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str()); + DIY_ERRORLOG("process","【ERROR】前置的%d号进程处理topic:%s_%s的实时数据触发消息失败,装置%s不在线", g_front_seg_index,FRONT_INST.c_str(), G_MQCONSUMER_TOPIC_RT.c_str(),devid.c_str()); return rocketmq::CONSUME_SUCCESS; } + //记录idx + devidx_set(devid, idx);//每次下发都会更新,不加入运行用的结构体 + ClientManager::instance().set_real_state_count(devid, 60, line);//一秒询问一次,询问60次,下一次同一个测点调用的话就会刷新 } else{ diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 5f6d6ef..00c162f 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -429,6 +429,7 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { os << "\r\x1B[K|-- tmnl_factory : " << dev.tmnl_factory << "\n"; os << "\r\x1B[K|-- tmnl_status : " << dev.tmnl_status << "\n"; os << "\r\x1B[K|-- timestamp : " << dev.timestamp << "\n"; + os << "\r\x1B[K|-- Righttime : " << dev.Righttime << "\n"; os << "\r\x1B[K|-- mac : " << dev.mac << "\n"; // ========================= 终端级 · 内部定值 ========================= diff --git a/LFtid1056/dealMsg.cpp b/LFtid1056/dealMsg.cpp index 55a0834..2a23db8 100644 --- a/LFtid1056/dealMsg.cpp +++ b/LFtid1056/dealMsg.cpp @@ -13,6 +13,7 @@ #include "cloudfront/code/interface.h" //lnk20250708 #include "cloudfront/code/rocketmq.h" //lnk20250708 +#include "cloudfront/code/log4.h" //lnk20250924 #include "client2.h" #include "cloudfront/code/log4.h" @@ -122,17 +123,13 @@ void process_received_message(string mac, string id,const char* data, size_t len //end_time.tm_min = 1; //end_time.tm_sec = 1; //ClientManager::instance().read_eventlog_action_to_device(id, start_time, end_time,2,1); - - //DIY_ERRORLOG_CODE("111", 0, static_cast(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 前置"); - //DIY_ERRORLOG_CODE(id, 1, static_cast(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 设备"); - /*std::string mpid; + DIY_ERRORLOG_CODE("111", 0, static_cast(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 前置"); + DIY_ERRORLOG_CODE(id, 1, static_cast(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 设备"); + std::string mpid; get_monitor_id_by_dev_and_seq(id, 1, mpid); if (!mpid.empty()) { DIY_ERRORLOG_CODE(mpid, 2, static_cast(LogCode::LOG_CODE_OTHER), "【ERROR】测试告警发送 测点"); - }*/ - - - + } } if (udata[19] == 0x00) { std::cout << "cloud login: " << mac << " state: fail!" << std::endl; @@ -153,6 +150,7 @@ void process_received_message(string mac, string id,const char* data, size_t len //处理主动上送的暂态事件报文 NewTaglogbuffer event = NewTaglogbuffer::createFromData(parser.RecvData.data(), parser.RecvData.size()); +<<<<<<< HEAD //获取测点id std::string mpid; get_monitor_id_by_dev_and_seq(id, 1, mpid); @@ -234,6 +232,8 @@ void process_received_message(string mac, string id,const char* data, size_t len } +======= +>>>>>>> 096e590 (add rtdata idx) // 获取测点参数 std::string strScale;//电压等级 int nPTType;//接线方式