diff --git a/LFtid1056/cloudfront/code/cfg_parser.cpp b/LFtid1056/cloudfront/code/cfg_parser.cpp index 24f6322..e8cd548 100644 --- a/LFtid1056/cloudfront/code/cfg_parser.cpp +++ b/LFtid1056/cloudfront/code/cfg_parser.cpp @@ -1288,7 +1288,7 @@ int recall_json_handle_from_mq(const std::string& body) if (ClientManager::instance().get_dev_status(targetDev->terminal_id) != 1) { std::cout << "terminalId对应装置不在线: " << targetDev->terminal_id << std::endl; std::string msg = std::string("装置:") + targetDev->terminal_name + " 不在线,无法补招"; - send_reply_to_kafka_recall(guid, dt, static_cast(ResponseCode::INTERNAL_ERROR), msg, targetDev->terminal_id, "", "", ""); + send_reply_to_kafka_recall(guid, dt, static_cast(ResponseCode::NOT_FOUND), msg, targetDev->terminal_id, "", "", ""); continue; } } @@ -1363,6 +1363,7 @@ int recall_json_handle_from_mq(const std::string& body) for (size_t i = 0; i < recallinfo_list_hour.size(); ++i) { const RecallInfo& info = recallinfo_list_hour[i]; RecallMonitor rm; + rm.recall_guid = guid; rm.recall_status = 0; rm.StartTime = epoch_to_datetime_str(info.starttime); rm.EndTime = epoch_to_datetime_str(info.endtime); @@ -1379,12 +1380,14 @@ int recall_json_handle_from_mq(const std::string& body) } } else { // 未知 dataType,忽略 + std::cout << "unknown dataType: " << dt << std::endl; continue; } } } else { // 不支持的 messageBody 形态 + std::cout << "unknown messageBody form" << std::endl; return 10004; } } @@ -3406,45 +3409,76 @@ bool send_file_list(terminal_dev* dev, const std::vector& FileList /////////////////////////////////////////////////////////////////////////////////////////////////////////////检查云前置终端的mq业务超时 std::string get_type_by_state(int state) { - switch (static_cast(state)) { - case DeviceState::READING_STATS: - return "读取统计数据"; //读数据 - case DeviceState::READING_STATS_TIME: + switch (state) { + case static_cast(DeviceState::IDLE): + return "空闲状态"; + + case static_cast(DeviceState::READING_STATS): + return "读取统计数据"; + + case static_cast(DeviceState::READING_STATS_TIME): return "读取统计时间"; - case DeviceState::READING_REALSTAT: + + case static_cast(DeviceState::READING_REALSTAT): return "读取实时数据"; - case DeviceState::READING_FIXEDVALUE: - return "读取定值"; - case DeviceState::READING_FIXEDVALUEDES: - return "读取定值描述"; - case DeviceState::SET_FIXEDVALUE: - return "设置定值"; - case DeviceState::READING_INTERFIXEDVALUE: - return "读取内部定值"; - case DeviceState::READING_INTERFIXEDVALUEDES: - return "读取内部定值描述"; - case DeviceState::READING_CONTROLWORD: - return "读取控制字"; - case DeviceState::SET_INTERFIXEDVALUE: - return "设置内部定值"; - //return 0x2106; //读数据 - case DeviceState::READING_FILEMENU: + case static_cast(DeviceState::READING_EVENTFILE): + return "暂态波形文件下载"; + + case static_cast(DeviceState::READING_FILEMENU): return "读取文件目录"; - //return 0x2131; //读目录 - case DeviceState::READING_EVENTFILE: - return "读取事件文件目录"; - //return 0x2133; //读事件文件目录 - case DeviceState::READING_FILEDATA: + case static_cast(DeviceState::READING_FILEDATA): return "读取文件数据"; - //return 0x2132; //读文件 + + case static_cast(DeviceState::READING_FIXEDVALUE): + return "读取测点定值"; + + case static_cast(DeviceState::READING_FIXEDVALUEDES): + return "读取测点定值描述"; + + case static_cast(DeviceState::SET_FIXEDVALUE): + return "设置测点定值"; + + case static_cast(DeviceState::READING_INTERFIXEDVALUE): + return "读取内部定值"; + + case static_cast(DeviceState::READING_INTERFIXEDVALUEDES): + return "读取内部定值描述"; + + case static_cast(DeviceState::READING_CONTROLWORD): + return "读取控制字描述"; + + case static_cast(DeviceState::SET_INTERFIXEDVALUE): + return "设置内部定值"; + + case static_cast(DeviceState::READING_RUNNINGINFORMATION_1): + return "读取装置运行信息(主动触发)"; + + case static_cast(DeviceState::READING_RUNNINGINFORMATION_2): + return "读取装置运行信息(定时执行)"; + + case static_cast(DeviceState::READING_DEVVERSION): + return "读取装置版本配置信息"; + + case static_cast(DeviceState::SET_RIGHTTIME): + return "设置装置对时"; + + case static_cast(DeviceState::READING_EVENTLOG): + return "补招事件日志"; + + case static_cast(DeviceState::READING_STATSFILE): + return "补招稳态数据文件"; + + case static_cast(DeviceState::CUSTOM_ACTION): + return "自定义动作"; default: - return "未知业务"; // 没有对应的type + return "未知业务"; // 未匹配的类型 } } + // 定时检查业务超时 void check_device_busy_timeout() { @@ -3458,7 +3492,7 @@ void check_device_busy_timeout() if (dev.busytype == static_cast(DeviceState::READING_FILEDATA)) //下载文件业务 { - if (dev.busytimecount > 30) + if (dev.busytimecount > 60) { std::cout << "[Timeout] Device " << dev.terminal_id << " busytype=READING_FILEDATA 超时(" @@ -3478,7 +3512,7 @@ void check_device_busy_timeout() } else //其他业务 { - if (dev.busytimecount > 10) + if (dev.busytimecount > 20) { std::cout << "[Timeout] Device " << dev.terminal_id << " busytype=" << dev.busytype @@ -4064,6 +4098,7 @@ void check_recall_event() { for (auto& dev : terminal_devlist) { //如果该终端不是正在补招或者idle则直接跳过,节省运行时间 if(dev.busytype != static_cast(DeviceState::READING_EVENTLOG) && dev.busytype != static_cast(DeviceState::IDLE)){ + std::cout << "[check_recall_event] skip dev=" << dev.terminal_id << std::endl; continue; } // 对正在补招或idle终端的所有监测点的待补招列表进行处理 @@ -4085,7 +4120,9 @@ void check_recall_event() { send_reply_to_kafka_recall(dev.guid,1,static_cast(ResponseCode::OK),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 - }else if (front.recall_status == static_cast(RecallStatus::EMPTY)) { + break; + } + else if (front.recall_status == static_cast(RecallStatus::EMPTY)) { std::cout << "[check_recall_event] EMPTY dev=" << dev.terminal_id << " monitor=" << lm.monitor_id << " " << front.StartTime << " ~ " << front.EndTime << std::endl; @@ -4098,6 +4135,7 @@ void check_recall_event() { send_reply_to_kafka_recall(dev.guid,1,static_cast(ResponseCode::NOT_FOUND),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 + break; } else if (front.recall_status == static_cast(RecallStatus::FAILED)) { std::cout << "[check_recall_event] FAILED dev=" << dev.terminal_id @@ -4112,31 +4150,16 @@ void check_recall_event() { send_reply_to_kafka_recall(dev.guid,1,static_cast(ResponseCode::BAD_REQUEST),msg,dev.terminal_id,lm.monitor_id,front.StartTime,front.EndTime); lm.recall_list.pop_front(); // 弹掉首条 + break; } else { - break; // 首条不是 2/3,停,如果是正在处理其他业务或者idle的装置写入了待补招列表,应该都是0;如果是正在补招的装置,新增的部分不会影响原有顺序 + std::cout << "[check_recall_event] skip line=" << lm.monitor_name<< std::endl; + break; // 首条不是 2/3/4,停,如果是正在处理其他业务或者idle的装置写入了待补招列表,应该都是0;如果是正在补招的装置,新增的部分不会影响原有顺序 } } if (!lm.recall_list.empty()) any_non_empty = true; // 处理了成功和失败的以后只要有一条非空就标记,可能是待处理或者正在处理的补招 } - if (!any_non_empty && dev.busytype == static_cast(DeviceState::READING_EVENTLOG)) { - // 该终端本轮已无任何补招条目,且处于补招暂态事件的状态清空运行态 - //通知补招全部完成 - - dev.guid.clear(); // 清空 guid - dev.busytype = 0; // 业务类型归零 - dev.isbusy = 0; // 清空业务标志 - dev.busytimecount = 0; // 计时归零 - continue; - } - //如果没有待补招任务,或者正在进行其他业务,应该跳过 - else if(!any_non_empty || (dev.busytype != static_cast(DeviceState::IDLE) && dev.busytype != static_cast(DeviceState::READING_EVENTLOG))){ - continue; - } - - //有待补招任务且处于补招事件状态或者idle状态,继续补招处理 - - // 2) 若该装置任一 monitor 的首条为 RUNNING,则该终端正在补招中 -> 跳过该终端,不会下发新的补招请求 + // pop后判断是否仍有 RUNNING(pop后应该都为unstarted,没pop的才会是running) bool has_running = false; for (auto& lm : dev.line) { if (!lm.recall_list.empty() && @@ -4145,37 +4168,74 @@ void check_recall_event() { break; } } - if (has_running) continue;//跳过这个装置 + + // 有条目但目前存在 RUNNING:继续等待该 RUNNING 完成,本轮不新发 + if (any_non_empty && has_running) { + std::cout << "[check_recall_event] skip dev=" << dev.terminal_id + << " already running recall" << std::endl; + continue; + } + + //pop后无任何补招条目,且处于补招状态:清空运行态 + if (!any_non_empty && dev.busytype == static_cast(DeviceState::READING_EVENTLOG)) { + // 该终端本轮已无任何补招条目,且处于补招暂态事件的状态清空运行态 + std::cout << "[check_recall_event] finish recall dev=" << dev.terminal_id << std::endl; + //通知补招全部完成 + + dev.guid.clear(); // 清空 guid + dev.busytype = static_cast(DeviceState::IDLE); // 业务类型归零 + dev.isbusy = 0; // 清空业务标志 + dev.busytimecount = 0; // 计时归零 + continue; //处理完处理下一个装置 + } + + + + //有待补招任务,且idle或者在补招继续补招处理 + std::cout << "[check_recall_event] idle or continue recall dev=" << dev.terminal_id << std::endl; // 若无 RUNNING,则说明该终端空闲,可以挑选新的补招任务 + if (any_non_empty && !has_running) { - // 3) 选择该终端的“第一条 NOT_STARTED(0)”作为本终端本轮任务 - bool picked = false; - for (auto& lm : dev.line) { - if (lm.recall_list.empty()) continue; //跳过空的监测点 + // 3) 选择该终端的“第一条 NOT_STARTED(0)”作为本终端本轮任务 + bool picked = false; + for (auto& lm : dev.line) { + if (lm.recall_list.empty()) { + std::cout << "[check_recall_event] skip empty line=" << lm.monitor_name<< std::endl; + continue; //跳过空的监测点 + } - RecallMonitor& front = lm.recall_list.front(); //取非空测点的列表的第一条 - if (front.recall_status == static_cast(RecallStatus::NOT_STARTED)) {//未补招 - // 标记为 RUNNING,并设置终端忙状态 - front.recall_status = static_cast(RecallStatus::RUNNING);//该补招记录刷新为补招中 + RecallMonitor& front = lm.recall_list.front(); //取非空测点的列表的第一条 + if (front.recall_status == static_cast(RecallStatus::NOT_STARTED)) {//未补招 + // 标记为 RUNNING,并设置终端忙状态 + front.recall_status = static_cast(RecallStatus::RUNNING);//该补招记录刷新为补招中 - dev.isbusy = 1; //标记为忙 - dev.busytype = static_cast(DeviceState::READING_EVENTLOG);//装置状态正在补招和idle的都刷新为正在补招 - dev.busytimecount = 0; //刷新业务超时计数 - - // 记录任务(每终端只取这一条,多个装置可以同时进行) - tasks.push_back(RecallTask{ - dev.terminal_id, - front.StartTime, - front.EndTime, - lm.logical_device_seq//记录测点号 - }); - picked = true; //该装置已取 - break; + dev.guid = front.recall_guid; // 记录本次补招的 guid + dev.isbusy = 1; //标记为忙 + dev.busytype = static_cast(DeviceState::READING_EVENTLOG);//装置状态正在补招和idle的都刷新为正在补招 + dev.busytimecount = 0; //刷新业务超时计数 + + // 记录任务(每终端只取这一条,多个装置可以同时进行) + tasks.push_back(RecallTask{ + dev.terminal_id, + front.StartTime, + front.EndTime, + lm.logical_device_seq//记录测点号 + }); + picked = true; //该装置已取 + break; + } } + // 若该终端没有 NOT_STARTED 的首条(可能剩下的都是 RUNNING 或内部已经被清空), + if (!picked) { + // 没挑到 NOT_STARTED(例如都是“首条非空但非 NOT_STARTED/非 RUNNING”的情况): + // 这种情况下本终端留给下一轮处理即可。 + std::cout << "[check_recall_event] skip dev=" << dev.terminal_id + << " no NOT_STARTED at head" << std::endl; + } + // 就留待下一轮;不要清空运行态,直到所有条目被处理为止。 + continue; } - // 若该终端没有 NOT_STARTED 的首条(可能剩下的都是 RUNNING 或内部已经被清空), - // 就留待下一轮;不要清空运行态,直到所有条目被处理为止。 } } // 解锁 diff --git a/LFtid1056/cloudfront/code/front.h b/LFtid1056/cloudfront/code/front.h index 32d9f7a..fea3575 100644 --- a/LFtid1056/cloudfront/code/front.h +++ b/LFtid1056/cloudfront/code/front.h @@ -94,6 +94,11 @@ public: std::atomic m_bIsFrontThreadCancle{false}; std::atomic m_IsMQConsumerCancel{false}; + std::atomic m_frontRunning{false}; + std::atomic m_consumerRunning{false}; + std::atomic m_producerRunning{false}; + std::atomic m_timerRunning{false}; + std::mutex m_threadCheckMutex; std::atomic m_needRestartFrontThread{false}; std::atomic m_needRestartConsumerThread{false}; @@ -110,6 +115,12 @@ public: void StartMQProducerThread(); void StartTimerThread(); + // [ADD] 统一的停止接口(便于重启前先停干净) + void StopFrontThread(); + void StopMQConsumerThread(); + void StopMQProducerThread(); + void StopTimerThread(); + void FrontThread(); void mqconsumerThread(); void mqproducerThread(); diff --git a/LFtid1056/cloudfront/code/interface.h b/LFtid1056/cloudfront/code/interface.h index 9aacb74..a4ed23d 100644 --- a/LFtid1056/cloudfront/code/interface.h +++ b/LFtid1056/cloudfront/code/interface.h @@ -44,6 +44,7 @@ public: class RecallMonitor { public: + std::string recall_guid; // 本次补招的唯一标识 GUID int recall_status; //补招状态 0-未补招 1-补招中 2-补招完成 3-补招失败 4-无数据 std::string StartTime; //数据补招起始时间 std::string EndTime; //数据补招结束时间 diff --git a/LFtid1056/cloudfront/code/main.cpp b/LFtid1056/cloudfront/code/main.cpp index f4f9957..b9726e5 100644 --- a/LFtid1056/cloudfront/code/main.cpp +++ b/LFtid1056/cloudfront/code/main.cpp @@ -246,7 +246,7 @@ std::string get_parent_directory() { } // ============ 关闭所有运行中的线程============ - void Front::FormClosing() { + /*void Front::FormClosing() { //确保testshell关闭 m_worker.stopServer(); @@ -286,11 +286,19 @@ std::string get_parent_directory() { m_MQConsumerThread.join(); } + }*/ + void Front::FormClosing() { + m_worker.stopServer(); + + StopFrontThread(); + StopTimerThread(); + StopMQProducerThread(); + StopMQConsumerThread(); } //============ 线程函数 ============ - void Front::StartFrontThread() { + /*void Front::StartFrontThread() { m_bIsFrontThreadCancle = false; m_FrontThread = std::thread(&Front::FrontThread, this); } @@ -308,7 +316,137 @@ std::string get_parent_directory() { void Front::StartTimerThread() { m_IsTimerCancel = false; m_TimerThread = std::thread(&Front::OnTimerThread, this); - } + } */ + void Front::StartFrontThread() { + bool expected = false; + if (!m_frontRunning.compare_exchange_strong(expected, true)) { + std::cout << "[FrontThread] already running, skip\n"; + return; + } + if (m_FrontThread.joinable()) m_FrontThread.join(); + + m_bIsFrontThreadCancle = false; + m_FrontThread = std::thread([this]{ + try { + this->FrontThread(); + } catch (const std::exception& e) { + std::cerr << "[FrontThread] exception: " << e.what() << "\n"; + } catch (...) { + std::cerr << "[FrontThread] unknown exception\n"; + } + m_frontRunning = false; // 线程真正退出后复位 + }); + } + + void Front::StartMQConsumerThread() { + bool expected = false; + if (!m_consumerRunning.compare_exchange_strong(expected, true)) { + std::cout << "[MQConsumer] already running, skip\n"; + return; + } + if (m_MQConsumerThread.joinable()) m_MQConsumerThread.join(); + + m_IsMQConsumerCancel = false; + m_MQConsumerThread = std::thread([this]{ + try { + this->mqconsumerThread(); + } catch (const std::exception& e) { + std::cerr << "[mqconsumerThread] exception: " << e.what() << "\n"; + } catch (...) { + std::cerr << "[mqconsumerThread] unknown exception\n"; + } + m_consumerRunning = false; + }); + } + + void Front::StartMQProducerThread() { + bool expected = false; + if (!m_producerRunning.compare_exchange_strong(expected, true)) { + std::cout << "[MQProducer] already running, skip\n"; + return; + } + if (m_MQProducerThread.joinable()) m_MQProducerThread.join(); + + m_IsMQProducerCancel = false; + m_MQProducerThread = std::thread([this]{ + try { + this->mqproducerThread(); + } catch (const std::exception& e) { + std::cerr << "[mqproducerThread] exception: " << e.what() << "\n"; + } catch (...) { + std::cerr << "[mqproducerThread] unknown exception\n"; + } + m_producerRunning = false; + }); + } + + void Front::StartTimerThread() { + bool expected = false; + if (!m_timerRunning.compare_exchange_strong(expected, true)) { + std::cout << "[Timer] already running, skip StartTimerThread\n"; + return; // 已有定时线程在跑,直接跳过 + } + + // 若有旧线程尚未 join,先回收 + if (m_TimerThread.joinable()) { + m_TimerThread.join(); + } + + m_IsTimerCancel.store(false, std::memory_order_relaxed); + + m_TimerThread = std::thread([this]{ + try { + this->OnTimerThread(); + } catch (const std::exception& e) { + std::cerr << "[Timer] exception: " << e.what() << "\n"; + } catch (...) { + std::cerr << "[Timer] unknown exception\n"; + } + m_timerRunning.store(false); // 线程真正退出后复位 + }); + } + + void Front::StopFrontThread() { + if (!m_frontRunning.load()) return; + m_bIsFrontThreadCancle = true; + if (m_FrontThread.joinable()) m_FrontThread.join(); + m_frontRunning = false; + } + + void Front::StopMQConsumerThread() { + if (!m_consumerRunning.load()) return; + + m_IsMQConsumerCancel = true; // 你的线程函数可能不轮询此标志,但先置上 + // 关闭 MQ 对象(避免内部阻塞线程仍在) + if (m_mqConsumer) { + try { m_mqConsumer->shutdown(); } catch (...) {} + m_mqConsumer.reset(); + } + m_listener.reset(); + + if (m_MQConsumerThread.joinable()) m_MQConsumerThread.join(); + m_consumerRunning = false; + } + + void Front::StopMQProducerThread() { + if (!m_producerRunning.load()) return; + + m_IsMQProducerCancel = true; + if (m_MQProducerThread.joinable()) m_MQProducerThread.join(); + m_producerRunning = false; + + // 如需销毁/关闭 producer,对应你的初始化方式: + // if (m_producer) { ShutdownProducer(m_producer); m_producer = nullptr; } + } + + void Front::StopTimerThread() { + if (!m_timerRunning.load()) return; // 没跑就不处理 + m_IsTimerCancel.store(true); + if (m_TimerThread.joinable()) { + m_TimerThread.join(); // 等它退出 + } + m_timerRunning.store(false); + } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////主功能线程 @@ -596,7 +734,7 @@ void* cloudfrontthread(void* arg) { { std::lock_guard lock(FrontProcess->m_threadCheckMutex); - if (FrontProcess->m_needRestartFrontThread) { + /*if (FrontProcess->m_needRestartFrontThread) { std::cout << "[Monitor] Restarting FrontThread..." << std::endl; FrontProcess->StartFrontThread(); FrontProcess->m_needRestartFrontThread = false; @@ -618,6 +756,34 @@ void* cloudfrontthread(void* arg) { std::cout << "[Monitor] Restarting TimerThread..." << std::endl; FrontProcess->StartTimerThread(); FrontProcess->m_needRestartTimerThread = false; + }*/ + + if (FrontProcess->m_needRestartFrontThread) { + std::cout << "[Monitor] Restarting FrontThread..." << std::endl; + FrontProcess->StopFrontThread(); + FrontProcess->StartFrontThread(); + FrontProcess->m_needRestartFrontThread = false; + } + + if (FrontProcess->m_needRestartConsumerThread) { + std::cout << "[Monitor] Restarting MQConsumerThread..." << std::endl; + FrontProcess->StopMQConsumerThread(); + FrontProcess->StartMQConsumerThread(); + FrontProcess->m_needRestartConsumerThread = false; + } + + if (FrontProcess->m_needRestartProducerThread) { + std::cout << "[Monitor] Restarting MQProducerThread..." << std::endl; + FrontProcess->StopMQProducerThread(); + FrontProcess->StartMQProducerThread(); + FrontProcess->m_needRestartProducerThread = false; + } + + if (FrontProcess->m_needRestartTimerThread) { + std::cout << "[Monitor] Restarting TimerThread..." << std::endl; + FrontProcess->StopTimerThread(); // 先停 + FrontProcess->StartTimerThread(); // 再启 + FrontProcess->m_needRestartTimerThread = false; } } diff --git a/LFtid1056/cloudfront/code/worker.cpp b/LFtid1056/cloudfront/code/worker.cpp index 88152af..138b71b 100644 --- a/LFtid1056/cloudfront/code/worker.cpp +++ b/LFtid1056/cloudfront/code/worker.cpp @@ -425,6 +425,12 @@ void Worker::printLedgerinshell(const terminal_dev& dev, int fd) { std::ostringstream os; os << "\r\x1B[K------------------------------------\n"; + + os << "\r\x1B[K|-- guid : " << dev.guid << "\n"; + os << "\r\x1B[K|-- busytype : " << dev.busytype << "\n"; + os << "\r\x1B[K|-- isbusy : " << dev.isbusy << "\n"; + os << "\r\x1B[K|-- busytimecount : " << dev.busytimecount << "\n"; + //os << "\r\x1B[K|-- dev_index : " << dev.dev_index << "\n"; os << "\r\x1B[K|-- terminal_id : " << dev.terminal_id << "\n"; os << "\r\x1B[K|-- terminal_name : " << dev.terminal_name << "\n";