/** * @file: $RCSfile: save2json.h,v $ * @brief: $IEC 61850 Protocol * * @version: $Revision: 1.5 $ * @date: $Date: 2018/12/23 12:39:52 $ * @author: $Author: lizhongming $ * @state: $State: Exp $ * * @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $ * */ #ifndef SAVE2DB_8ue3hy0923r_H #define SAVE2DB_8ue3hy0923r_H #ifdef __cplusplus extern "C" { #endif extern int g_front_seg_index; #ifdef __cplusplus } #endif #ifdef __cplusplus #include #include #include #include #include //lnk20250106 #include "../include/rocketmq/SimpleProducer.h" //日志功能 #include "../cfg_parse/custom_printf.h"//lnk20250225 #include #include #include #include #include #include #include #include #include #include #include extern int G_TEST_NUM; extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL); extern void value_print(const char *variableName, QTcpSocket *clientSocket); extern int TEST_PORT; ////////////////////////////////////////////////////////////////////////////// //struct json_pair_info //{ // string topic; // "RTDATA" 实时数据 "RTDATASOE"实时SOE事件 等 // string data_type; //数据类型,01/04:稳态/召测稳态,02/05:闪变/召测闪变,03/06:暂态/召测暂态 // string item; // 或"I" "PQ" 等 // string sequence; //"A" 或 "B" 或 "C" 或 "T" // string name; //json的key // int type; //6-值索引、9-实时SOE事件 // string mms_ref; //mms地址字符串 // float coeff; //Coefficient:数据系数 // unsigned short PltFlag; //0xffff //}; ///////////////////////////////////////////////////////////////////////////// class KafkaSendThread : public QThread { // Q_OBJECT //public: protected: void run(); }; //WW 2023-08-22 增加数据库线程和WebSocket线程 class SQLExcuteThread : public QThread { protected: void run(); }; class WebSocketThread : public QThread { protected: void run(); }; //lnk20241029 class WebhttpThread : public QThread { protected: void run(); }; class httpThread : public QThread { protected: void run(); }; //lnk20241202 /*class mqtestThread : public QThread { protected: void run(); };*/ //lnk20250106 extern bool showinshellflag; class Worker : public QObject { Q_OBJECT public: Worker(QObject *parent = NULL) : QObject(parent), server(NULL), TEST_NUM(G_TEST_NUM), timer(NULL), historyIndex(-1), stopViewLog(false), activeClient(NULL) { } ~Worker() { // 清理定时器和服务器 stopServer(); } void handleViewLogCommand(const QString& command, QTcpSocket* clientSocket); public slots: void startServer() { if (server) { qDebug() << "Server is already running!"; return; // 防止重复启动服务器 } // 创建 QTcpServer 并设置信号与槽 server = new QTcpServer(this); connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection())); // 尝试监听端口 if (!server->listen(QHostAddress::Any, TEST_PORT)) { std::cout << "Server failed to start!" << std::endl; qDebug() << "Server failed to start!"; emit serverError(); return; } else { std::cout << "Server is running on port " << TEST_PORT << std::endl; qDebug() << QString("Server is running on port %1").arg(TEST_PORT); } // 创建并启动定时器 timer = new QTimer(this); connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask())); timer->start(60000); // 每60秒触发一次 std::cout << "Timer started, event loop running in thread: " << QThread::currentThreadId() << std::endl; qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId(); } void stopServer() { // 停止服务器并清理资源 if (server) { server->close(); delete server; server = NULL; qDebug() << "Server stopped."; } // 停止定时器 if (timer) { timer->stop(); delete timer; timer = NULL; qDebug() << "Timer stopped."; } } void setTestNum(int num) { QMutexLocker locker(&mutex); TEST_NUM = num; } private slots: void doPeriodicTask() { QMutexLocker locker(&mutex); std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl; qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM; if (TEST_NUM != 0) { qDebug() << "Executing rocketmq_test_300()"; std::cout << "Executing rocketmq_test_300()\n"; rocketmq_test_300(TEST_NUM, g_front_seg_index); } } void onNewConnection() { if (!server) return; QTcpSocket *clientSocket = server->nextPendingConnection(); qDebug() << "New connection established!"; std::cout << "New connection established!\n"; // 当有数据可读时,进入 onReadyRead() connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead())); // 当客户端断开时,自动清理 socket connect(clientSocket, SIGNAL(disconnected()), clientSocket, SLOT(deleteLater())); // 向客户端发送提示符 if (clientSocket) { std::cout << "clientSocket OK\n"; clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\n> "); clientSocket->flush(); // 确保消息立即发送 } } /** * @brief 逐字节处理Telnet输入,识别方向键(上下)、退格、回车等 * 当按下回车时,将当前行内容视为一条命令交给 processCommand 解析。 */ void onReadyRead() { QTcpSocket *clientSocket = qobject_cast(sender()); if (!clientSocket) { std::cout << "Invalid socket\n"; return; } QByteArray data = clientSocket->readAll(); // 逐字节处理输入 for (int i = 0; i < data.size(); ++i) { char c = data[i]; if (c == 'q') { // ? 用户输入 `q` 退出 `viewlog` std::cout << "Received 'q' from shell socket! Exiting viewlog...\n"; if (activeClient == clientSocket) { stopViewLog = true; // ? 让 `viewlog` 退出 showinshellflag = false; clientSocket->write("\nLog view stopped. Returning to shell.\n> "); clientSocket->flush(); } return; // ? 立即返回,避免继续处理其他字符 } switch (c) { case '\r': case '\n': // ? 处理回车,执行命令 if (!currentCommand.isEmpty()) { if (commandHistory.isEmpty() || commandHistory.last() != currentCommand) { commandHistory.append(currentCommand); } historyIndex = commandHistory.size(); // 指向最新一条之后 processCommand(currentCommand, clientSocket); currentCommand.clear(); } else { clientSocket->write("\n> "); clientSocket->flush(); } break; case '\x1b': // ? 处理方向键 (上箭头 `\x1b[A`,下箭头 `\x1b[B`) if (i + 2 < data.size() && data[i+1] == '[') { char arrow = data[i+2]; if (arrow == 'A') { handleUpArrow(clientSocket); // **调用上箭头处理** } else if (arrow == 'B') { handleDownArrow(clientSocket); // **调用下箭头处理** } i += 2; // ? **跳过 ESC 序列 `\x1b[A` 或 `\x1b[B`** continue; } break; case '\x7f': case '\x08': // ? 处理退格键 if (!currentCommand.isEmpty()) { currentCommand.chop(1); clientSocket->write("\b \b"); clientSocket->flush(); } break; default: // ? 普通字符,追加到 `currentCommand` currentCommand.append(c); clientSocket->write(&c, 1); clientSocket->flush(); break; } } } signals: void serverError(); private: // ====================== 新增日志功能 ========================= bool stopViewLog; // ? 这里不需要 static,针对每个 client QTcpSocket* activeClient; // 记录当前正在 `viewlog` 的客户端 // -------------------- // 以下为新增的辅助函数 // -------------------- /** * @brief 处理输入命令(完整的一行),即原先在 onReadyRead() 大量 if-else 的逻辑 */ void processCommand(const QString &command, QTcpSocket *clientSocket) { // 打印日志 qDebug() << "Received command:" << command; std::cout << "Received command: " << command.toStdString() << "\n"; // 以下为原先 onReadyRead() 中的 if / else if 处理逻辑 // ------------------------------------------------------ if (command == "help") { QString helpText = "Available commands:\n"; helpText += "TEST_NUM= - Set the TEST_NUM\n"; helpText += "rc - Execute rocketmq_test_rc\n"; helpText += "rt - Execute rocketmq_test_rt\n"; helpText += "ud - Execute rocketmq_test_ud\n"; helpText += "set - Execute rocketmq_test_set\n"; helpText += "only - Execute rocketmq_test_only\n"; helpText += "log - Execute rocketmq_test_log\n"; helpText += "ledger - Execute ledger with optional terminal_id\n"; helpText += "viewlog - View logs (ERROR, WARN, NORMAL, DEBUG)\n"; helpText += "value - Execute value print with valuename : iedcount frontfun frontindex remtable\n"; helpText += "exit - Exit the shell\n"; helpText += "help - Show this help message\n"; clientSocket->write(helpText.toUtf8()); } else if (command.startsWith("viewlog")) { showinshellflag = true; handleViewLogCommand(command, clientSocket); } // 设置多点模拟的个数 else if (command.startsWith("TEST_NUM=")) { bool ok; int num = command.mid(9).toInt(&ok); // 获取等号后面的数字部分 if (ok) { setTestNum(num); // 更新 TEST_NUM clientSocket->write("TEST_NUM updated\n"); std::cout << "TEST_NUM updated\n"; } else { clientSocket->write("Invalid number\n"); std::cout << "Invalid number\n"; } } // 发送补招数据测试文本 else if (command.startsWith("rc")) { qDebug() << "Executing rocketmq_test_rc()"; std::cout << "Executing rocketmq_test_rc()\n"; rocketmq_test_rc(); // 调用 rc 函数 clientSocket->write("Executed rocketmq_test_rc\n"); } // 发送实时数据测试文本 else if (command.startsWith("rt")) { qDebug() << "Executing rocketmq_test_rt()"; std::cout << "Executing rocketmq_test_rt()\n"; rocketmq_test_rt(); // 调用 rt 函数 clientSocket->write("Executed rocketmq_test_rt\n"); } // 发送台账更新测试文本 else if (command.startsWith("ud")) { qDebug() << "Executing rocketmq_test_ud()"; std::cout << "Executing rocketmq_test_ud()\n"; rocketmq_test_ud(); // 调用 ud 函数 clientSocket->write("Executed rocketmq_test_ud\n"); } // 发送进程控制测试文本 else if (command.startsWith("set")) { qDebug() << "Executing rocketmq_test_set()"; std::cout << "Executing rocketmq_test_set()\n"; rocketmq_test_set(); // 调用 set 函数 clientSocket->write("Executed rocketmq_test_set\n"); } // 发送单连进程测试文本 else if (command.startsWith("only")) { qDebug() << "Executing rocketmq_test_only()"; std::cout << "Executing rocketmq_test_only()\n"; rocketmq_test_only(); // 调用 rocketmq_test_only 函数 clientSocket->write("Executed rocketmq_test_only\n"); } // 发送实时日志测试文本 else if (command.startsWith("log")) { qDebug() << "Executing rocketmq_test_log()"; std::cout << "Executing rocketmq_test_log()\n"; rocketmq_test_log(); // 调用 log 函数 clientSocket->write("Executed rocketmq_test_log\n"); } // 查看当前进程的台账 else if (command.startsWith("ledger")) { qDebug() << "Executing ledger()"; std::cout << "Executing ledger()\n"; // 提取参数 QStringList parts = command.split(" "); // 根据空格分割命令 if (parts.size() > 1) { // 如果命令中包含参数(即 id号) QString terminalId = parts[1]; std::cout << "Calling ledger with terminal_id: " << terminalId.toStdString() << std::endl; ledger(terminalId.toStdString().c_str(), clientSocket); // 带参数调用 ledger clientSocket->write("Executed ledger with terminal_id\n"); } else { std::cout << "Calling ledger without parameters\n"; ledger(NULL, clientSocket); // 无参数调用 ledger clientSocket->write("Executed ledger without parameters\n"); } } // 查看当前进程的指定值 else if (command.startsWith("value")) { std::cout << "Executing value()" << std::endl; // 提取命令中的参数,获取变量名 QStringList parts = command.split(" "); if (parts.size() > 1) { QString variableName = parts[1]; std::cout << "Calling value() with variable name: " << variableName.toStdString() << std::endl; // 调用 value_print() 输出变量值 value_print(variableName.toStdString().c_str(), clientSocket); clientSocket->write("Executed value with variable name: " + variableName.toUtf8() + "\n"); } else { std::cout << "Calling value without parameters" << std::endl; clientSocket->write("Please provide a variable name\n"); } } // 处理 exit 命令 else if (command == "exit") { // 发送退出信息并关闭客户端连接 clientSocket->write("Goodbye! Exiting shell...\n"); clientSocket->flush(); clientSocket->disconnectFromHost(); // 关闭连接 clientSocket->waitForDisconnected(); // 确保连接断开 return; } // 未知命令 else { clientSocket->write("Unknown command\n"); } // 处理完命令后,回显一个新行和提示符 clientSocket->write("> "); clientSocket->flush(); } /** * @brief 处理上箭头,从历史记录中调出上一条命令 */ void handleUpArrow(QTcpSocket *clientSocket) { if (!commandHistory.isEmpty() && historyIndex > 0) { historyIndex--; currentCommand = commandHistory[historyIndex]; // **清空当前行** clientSocket->write("\r"); // 移动光标到行首 clientSocket->write(" "); // 用空格覆盖 clientSocket->write("\r> "); // 重新打印提示符 clientSocket->write(currentCommand.toUtf8()); // 回显历史命令 clientSocket->flush(); } } /** * @brief 处理下箭头,从历史记录中调出下一条命令 */ void handleDownArrow(QTcpSocket *clientSocket) { if (!commandHistory.isEmpty() && historyIndex < commandHistory.size() - 1) { historyIndex++; currentCommand = commandHistory[historyIndex]; // **清空当前行** clientSocket->write("\r"); clientSocket->write(" "); clientSocket->write("\r> "); clientSocket->write(currentCommand.toUtf8()); // 回显历史命令 clientSocket->flush(); } else if (historyIndex == commandHistory.size() - 1) { // **如果已经是最后一条,再按下箭头,则清空当前输入** historyIndex = commandHistory.size(); currentCommand.clear(); clientSocket->write("\r"); clientSocket->write(" "); clientSocket->write("\r> "); clientSocket->flush(); } } private: QTcpServer *server; QTimer *timer; int TEST_NUM; QMutex mutex; // 历史命令相关 QList commandHistory; // 存储历史命令 int historyIndex; // 当前历史命令索引 QString currentCommand; // 当前正在输入的命令 }; //lnk20241213 class mqconsumerThread : public QThread { protected: void run(); }; class OnTimerThread : public QThread//定时线程 { protected: void run(); }; //WW 2023-08-22 end ///////////////////////////////////////////////////////////////////////// extern "C" { #endif //lnk20250106添加台账结构 typedef struct terminal terminal; typedef struct monitor monitor; struct monitor // 监测点台账 { char monitor_id[64]; char terminal_code[64]; char monitor_name[64]; char logical_device_seq[64]; char voltage_level[64]; char terminal_connect[64]; char timestamp[64]; char status[255]; }; struct terminal // 终端台账 { char terminal_id[64]; char terminal_code[64]; char org_name[64]; char maint_name[64]; char station_name[64]; char tmnl_factory[64]; char tmnl_status[64]; char dev_type[64]; char dev_key[255]; char dev_series[255]; char processNo[64]; //lnk20250210进程号 char addr_str[64]; char port[64]; char timestamp[64]; monitor line[10]; // 最多 10 个监测点 }; #ifdef __cplusplus } #endif ///////////////////////////////////////////////////////////////////////////// #endif //SAVE2DB_8ue3hy0923r_H