/** * @file: $RCSfile: save2json.h,v $ * @brief: $IEC 61850 Protocol * * @version: $Revision: 1.5 $ * @date: $Date: 2018/12/23 12:39:52 $ * @author: $Author: lizhongming $ * @state: $State: Exp $ * * @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $ * */ #ifndef SAVE2DB_8ue3hy0923r_H #define SAVE2DB_8ue3hy0923r_H #ifdef __cplusplus extern "C" { #endif extern int g_front_seg_index; #ifdef __cplusplus } #endif #ifdef __cplusplus #include #include #include #include #include "../mms/db_interface.h" #include //lnk20250106 #include "../include/rocketmq/SimpleProducer.h" //日志功能 #include "../cfg_parse/custom_printf.h"//lnk20250225 #include #include #include #include #include #include #include #include #include #include #include extern int G_TEST_NUM; extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL); extern void value_print(const char *variableName, QTcpSocket *clientSocket); extern int TEST_PORT; extern void redirectErrorOutput(bool enable); extern void redirectWarnOutput(bool enable); extern void redirectNormalOutput(bool enable); extern void redirectDebugOutput(bool enable); class KafkaSendThread : public QThread { protected: void run(); }; //WW 2023-08-22 增加数据库线程和WebSocket线程 class WebSocketThread : public QThread { protected: void run(); }; //lnk20241029 class WebhttpThread : public QThread { protected: void run(); }; class httpThread : public QThread { protected: void run(); }; //lnk20250106 extern bool showinshellflag; #ifdef __cplusplus extern "C" { #endif void doMonitorTaskmain(void); #ifdef __cplusplus } #endif // ====================== Telnet 常量定义 ====================== // Telnet 命令字节: #define IAC 255 // Interpret As Command #define DONT 254 #define TELDO 253 #define WONT 252 #define WILL 251 // Telnet 选项常量: #define TELOPT_ECHO 1 // 回显 #define TELOPT_SUPPRESS_GO_AHEAD 3 // SGA #define TELOPT_LINEMODE 34 // 行模式 /** * @brief Worker类:包含启动服务器、处理Telnet交互等逻辑 */ class Worker : public QObject { Q_OBJECT public: Worker(QObject *parent = NULL) : QObject(parent), server(NULL), TEST_NUM(G_TEST_NUM), timer(NULL), historyIndex(-1), stopViewLog(true), g_stopTelnetTest(true), activeClient(NULL) { } ~Worker() { // 清理定时器和服务器 stopServer(); } void handleViewLogCommand(const QString& command, QTcpSocket* clientSocket); int init_ping_telnet(QTcpSocket* clientSocket, int& ip_count, int& telnet_count); void telnetetst(QTcpSocket* clientSocket); public slots: /** * @brief 启动服务器 */ void startServer() { if (server) { qDebug() << "Server is already running!"; return; // 防止重复启动服务器 } // 创建 QTcpServer 并设置信号与槽 server = new QTcpServer(this); connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection())); // 尝试监听端口 if (!server->listen(QHostAddress::Any, TEST_PORT)) { std::cout << "Server failed to start!" << std::endl; qDebug() << "Server failed to start!"; emit serverError(); return; } else { std::cout << "Server is running on port " << TEST_PORT << std::endl; qDebug() << QString("Server is running on port %1").arg(TEST_PORT); } // 创建并启动定时器 timer = new QTimer(this); connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask())); timer->start(60000); // 每60秒触发一次 // 开启另一个周期函数用来替换主线程的监控 QTimer *monitorTimer = new QTimer(this); connect(monitorTimer, SIGNAL(timeout()), this, SLOT(doMonitorTask())); monitorTimer->start(1000); // 每1秒触发一次 std::cout << "Timer started, event loop running in thread: " << QThread::currentThreadId() << std::endl; qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId(); } /** * @brief 停止服务器 */ void stopServer() { // 停止服务器并清理资源 if (server) { server->close(); delete server; server = NULL; qDebug() << "Server stopped."; } // 停止定时器 if (timer) { timer->stop(); delete timer; timer = NULL; qDebug() << "Timer stopped."; } } /** * @brief 设置TEST_NUM */ void setTestNum(int num) { QMutexLocker locker(&mutex); TEST_NUM = num; } void setTestlog(bool flag) { redirectErrorOutput(flag); redirectWarnOutput(flag); redirectNormalOutput(flag); redirectDebugOutput(flag); } private slots: /** * @brief 定时任务 */ void doPeriodicTask() { QMutexLocker locker(&mutex); std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl; qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM; if (TEST_NUM != 0) { qDebug() << "Executing rocketmq_test_300()"; std::cout << "Executing rocketmq_test_300()\n"; rocketmq_test_300(TEST_NUM, g_front_seg_index); } } /** * @brief 监控任务 */ void doMonitorTask() { doMonitorTaskmain(); } /** * @brief 当有新客户端连接时处理 */ void onNewConnection() { if (!server) return; QTcpSocket *clientSocket = server->nextPendingConnection(); qDebug() << "New connection established!"; std::cout << "New connection established!\n"; // 绑定 readyRead / disconnected connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead())); connect(clientSocket, SIGNAL(disconnected()), clientSocket, SLOT(deleteLater())); // 发送 Telnet 协商 sendTelnetNegotiation(clientSocket); // 发送欢迎信息和提示符 if (clientSocket) { std::cout << "clientSocket OK\n"; clientSocket->write("\r\x1B[K"); clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\r\n"); printPrompt(clientSocket); // 统一打印提示符 } } /** * @brief 处理客户端发送的Telnet数据 */ void onReadyRead() { QTcpSocket *clientSocket = qobject_cast(sender()); if (!clientSocket) { std::cout << "Invalid socket\n"; return; } QByteArray data = clientSocket->readAll(); for (int i = 0; i < data.size(); ++i) { unsigned char c = static_cast(data[i]); // 如果检测到 IAC(255),说明是 Telnet 协商指令 if (c == IAC) { // 简单跳过 TELDO/DONT/WILL/WONT + option if (i + 1 < data.size()) { unsigned char cmd = static_cast(data[i+1]); if (cmd == TELDO || cmd == DONT || cmd == WILL || cmd == WONT) { i += 2; // 跳过这2字节 } else { // 遇到其它情况(比如IAC SB),此处仅简单跳过一个字节 i += 1; } } continue; } // 1) 处理 '`' 退出 viewlog 和ping if (c == '`') { std::cout << "Received '`' from shell socket! Exiting viewlog...\n"; if (activeClient == clientSocket) { stopViewLog = true; g_stopTelnetTest = true; clientSocket->write("\r\x1B[K"); clientSocket->write("\r\nLog view stopped. Returning to shell.\r\n"); printPrompt(clientSocket); } return; } // 2) 回车换行:执行命令 if (c == '\r' || c == '\n') { if (!currentCommand.isEmpty()) { // 加到历史 if (commandHistory.isEmpty() || commandHistory.last() != currentCommand) { commandHistory.append(currentCommand); } historyIndex = commandHistory.size(); // 执行命令时,忽略前后空白 //QString trimmedCmd = currentCommand.trimmed(); currentCommand.remove(0, 1); processCommand(currentCommand, clientSocket); currentCommand.clear(); } else { // 空行 => 仅打印新的提示符 printPrompt(clientSocket); } continue; } // 3) 方向键 if (c == '\x1b') { if (i + 2 < data.size() && data[i+1] == '[') { char arrow = data[i+2]; if (arrow == 'A') { handleUpArrow(clientSocket); } else if (arrow == 'B') { handleDownArrow(clientSocket); } i += 2; } continue; } // 假设提示符固定为 "> ",提示符长度为2 const int promptLength = 1; // 4) 退格键 if (c == '\x7f' || c == '\b') { // 仅当 currentCommand 的长度大于提示符长度时,允许删除字符 if (currentCommand.length() > promptLength) { currentCommand.chop(1); // 回显退格:用 "\b \b" 来擦除屏幕上最后一个字符 clientSocket->write("\b \b"); clientSocket->flush(); } continue; } // 5) 普通字符 currentCommand.append(static_cast(c)); clientSocket->write((const char*)&c, 1); clientSocket->flush(); } } signals: void serverError(); private: // ========== Telnet 协商函数 ========== void sendTelnetNegotiation(QTcpSocket *socket) { // 发送 WILL ECHO / WILL SUPPRESS-GO-AHEAD / DONT LINEMODE static const unsigned char will_echo[3] = { IAC, WILL, TELOPT_ECHO }; static const unsigned char will_sga[3] = { IAC, WILL, TELOPT_SUPPRESS_GO_AHEAD }; static const unsigned char dont_linemode[3] = { IAC, DONT, TELOPT_LINEMODE }; socket->write(reinterpret_cast(will_echo), 3); socket->write(reinterpret_cast(will_sga), 3); socket->write(reinterpret_cast(dont_linemode), 3); socket->flush(); } /** * @brief 打印提示符:统一使用\r\n换行,并且打印"> "于行首 */ void printPrompt(QTcpSocket *clientSocket) { clientSocket->write("\n\r\x1B[K> "); clientSocket->flush(); } /** * @brief 执行一条命令(已被trimmed) */ void processCommand(const QString &cmd, QTcpSocket *clientSocket) { qDebug() << "Received command:" << cmd; std::cout << "Received command: " << cmd.toStdString() << "\n"; // 命令解析 if (cmd == "help") { QString helpText = "Available commands:\r\n"; helpText += "TEST_NUM= - Set the TEST_NUM\r\n"; helpText += "LOG= - Set the LOG\r\n"; helpText += "telnettest - Set the telnettest\r\n"; helpText += "rc - Execute rocketmq_test_rc\r\n"; helpText += "rt - Execute rocketmq_test_rt\r\n"; helpText += "ud - Execute rocketmq_test_ud\r\n"; helpText += "set - Execute rocketmq_test_set\r\n"; helpText += "only - Execute rocketmq_test_only\r\n"; helpText += "log - Execute rocketmq_test_log\r\n"; helpText += "soe - Execute http_test_soe\r\n"; helpText += "qvvr - Execute http_test_qvvr\r\n"; helpText += "connect - Execute http_test_connect\r\n"; helpText += "ledger - Execute ledger with optional terminal_id\r\n"; helpText += "viewlog - View logs (ERROR, WARN, NORMAL, DEBUG)\r\n"; helpText += "value - Execute value print with valuename : frontindex remtable iedcount frontfun log init\r\n"; helpText += "exit - Exit the shell\r\n"; helpText += "help - Show this help message\r\n"; clientSocket->write("\r\x1B[K"); clientSocket->write(helpText.toUtf8()); } else if (cmd.startsWith("viewlog")) { showinshellflag = true; handleViewLogCommand(cmd, clientSocket); } else if (cmd.startsWith("TEST_NUM=")) { bool ok; int num = cmd.mid(9).toInt(&ok); if (ok) { setTestNum(num); clientSocket->write("\r\x1B[K"); clientSocket->write("TEST_NUM updated\r\n"); } else { clientSocket->write("\r\x1B[K"); clientSocket->write("Invalid number\r\n"); } } else if (cmd.startsWith("LOG=")) { bool ok; bool flag = cmd.mid(4).toInt(&ok); if (ok) { setTestlog(flag); clientSocket->write("\r\x1B[K"); clientSocket->write("TEST_NUM updated\r\n"); } else { clientSocket->write("\r\x1B[K"); clientSocket->write("Invalid number\r\n"); } } else if (cmd.startsWith("telnettest")) { g_stopTelnetTest = false; telnetetst(clientSocket); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed telnettest warning!!! it woont stop until finish!!!\r\n"); } else if (cmd.startsWith("rc")) { rocketmq_test_rc(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed rocketmq_test_rc\r\n"); } else if (cmd.startsWith("rt")) { rocketmq_test_rt(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed rocketmq_test_rt\r\n"); } else if (cmd.startsWith("ud")) { rocketmq_test_ud(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed rocketmq_test_ud\r\n"); } else if (cmd.startsWith("set")) { rocketmq_test_set(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed rocketmq_test_set\r\n"); } else if (cmd.startsWith("only")) { rocketmq_test_only(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed rocketmq_test_only\r\n"); } else if (cmd.startsWith("log")) { rocketmq_test_log(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed rocketmq_test_log\r\n"); } else if (cmd.startsWith("soe")) { SOEFileWeb_test(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed http_test_soe\r\n"); } else if (cmd.startsWith("qvvr")) { qvvr_test(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed http_test_qvvr\r\n"); } else if (cmd.startsWith("connect")) { comflag_test(); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed http_test_connect\r\n"); } else if (cmd.startsWith("ledger")) { QStringList parts = cmd.split(" "); if (parts.size() > 1) { QString terminalId = parts[1]; ledger(terminalId.toStdString().c_str(), clientSocket); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed ledger with terminal_id\r\n"); } else { ledger(NULL, clientSocket); clientSocket->write("\r\x1B[K"); clientSocket->write("Executed ledger without parameters\r\n"); } } else if (cmd.startsWith("value")) { QStringList parts = cmd.split(" "); if (parts.size() > 1) { QString variableName = parts[1]; clientSocket->write("\r\x1B[K"); clientSocket->write("Executed value with variable name: " + variableName.toUtf8() + "\r\n"); value_print(variableName.toStdString().c_str(), clientSocket); } else { clientSocket->write("\r\x1B[K"); clientSocket->write("Please provide a variable name\r\n"); } } else if (cmd == "exit") { clientSocket->write("\r\x1B[K"); clientSocket->write("Goodbye! Exiting shell...\r\n"); clientSocket->flush(); clientSocket->disconnectFromHost(); clientSocket->waitForDisconnected(); return; } else { clientSocket->write("\r\x1B[K> "); clientSocket->write("Unknown command\r\n"); clientSocket->flush(); } // 命令处理结束后,打印提示符 printPrompt(clientSocket); } /** * @brief 上箭头:历史命令回溯 */ void handleUpArrow(QTcpSocket *clientSocket) { if (!commandHistory.isEmpty() && historyIndex > 0) { historyIndex--; currentCommand = commandHistory[historyIndex]; // 清行:\r回到行首 + \x1B[K清除光标后文字 clientSocket->write("\r\x1B[K> "); clientSocket->write(currentCommand.toUtf8()); clientSocket->flush(); } } /** * @brief 下箭头:历史命令前进 */ void handleDownArrow(QTcpSocket *clientSocket) { if (!commandHistory.isEmpty() && historyIndex < commandHistory.size() - 1) { historyIndex++; currentCommand = commandHistory[historyIndex]; clientSocket->write("\r\x1B[K> "); clientSocket->write(currentCommand.toUtf8()); clientSocket->flush(); } else if (historyIndex == commandHistory.size() - 1) { historyIndex = commandHistory.size(); currentCommand.clear(); clientSocket->write("\r\x1B[K> "); clientSocket->flush(); } } private: QTcpServer *server; QTimer *timer; int TEST_NUM; QMutex mutex; // 历史命令相关 QList commandHistory; // 存储历史命令 int historyIndex; // 当前历史命令索引 QString currentCommand; // 当前正在输入的命令 // viewlog 相关 bool stopViewLog; //ping相关 bool g_stopTelnetTest; QTcpSocket* activeClient; }; //lnk20241213 class mqconsumerThread : public QThread { protected: void run(); }; class OnTimerThread : public QThread//定时线程 { protected: void run(); }; //WW 2023-08-22 end ///////////////////////////////////////////////////////////////////////// extern "C" { #endif //lnk20250106添加台账结构 typedef struct terminal terminal; typedef struct monitor monitor; struct monitor // 监测点台账 { char monitor_id[64]; char terminal_code[64]; char monitor_name[64]; char logical_device_seq[64]; char voltage_level[64]; char terminal_connect[64]; char timestamp[64]; char status[255]; }; struct terminal // 终端台账 { char terminal_id[64]; char terminal_code[64]; char org_name[64]; char maint_name[64]; char station_name[64]; char tmnl_factory[64]; char tmnl_status[64]; char dev_type[64]; char dev_key[255]; char dev_series[255]; char processNo[64]; //lnk20250210进程号 char addr_str[64]; char port[64]; char timestamp[64]; monitor line[10]; // 最多 10 个监测点 }; #ifdef __cplusplus } #endif ///////////////////////////////////////////////////////////////////////////// #endif //SAVE2DB_8ue3hy0923r_H