/** * @file: $RCSfile: save2json.h,v $ * @brief: $IEC 61850 Protocol * * @version: $Revision: 1.5 $ * @date: $Date: 2018/12/23 12:39:52 $ * @author: $Author: lizhongming $ * @state: $State: Exp $ * * @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $ * */ #ifndef SAVE2DB_8ue3hy0923r_H #define SAVE2DB_8ue3hy0923r_H #ifdef __cplusplus extern "C" { #endif extern int g_front_seg_index; #ifdef __cplusplus } #endif #ifdef __cplusplus #include #include #include #include #include //lnk20250106 #include "../include/rocketmq/SimpleProducer.h" #include #include #include #include #include #include #include #include #include extern int G_TEST_NUM; extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL); extern void value_print(const char *variableName, QTcpSocket *clientSocket); extern int TEST_PORT; ////////////////////////////////////////////////////////////////////////////// //struct json_pair_info //{ // string topic; // "RTDATA" 实时数据 "RTDATASOE"实时SOE事件 等 // string data_type; //数据类型,01/04:稳态/召测稳态,02/05:闪变/召测闪变,03/06:暂态/召测暂态 // string item; // 或"I" "PQ" 等 // string sequence; //"A" 或 "B" 或 "C" 或 "T" // string name; //json的key // int type; //6-值索引、9-实时SOE事件 // string mms_ref; //mms地址字符串 // float coeff; //Coefficient:数据系数 // unsigned short PltFlag; //0xffff //}; ///////////////////////////////////////////////////////////////////////////// class KafkaSendThread : public QThread { // Q_OBJECT //public: protected: void run(); }; //WW 2023-08-22 增加数据库线程和WebSocket线程 class SQLExcuteThread : public QThread { protected: void run(); }; class WebSocketThread : public QThread { protected: void run(); }; //lnk20241029 class WebhttpThread : public QThread { protected: void run(); }; class httpThread : public QThread { protected: void run(); }; //lnk20241202 /*class mqtestThread : public QThread { protected: void run(); };*/ //lnk20250106 //使用telnet 127.0.0.1 12345进入测试 class Worker : public QObject { Q_OBJECT public: Worker(QObject *parent = NULL) : QObject(parent), server(NULL), TEST_NUM(G_TEST_NUM) { timer = NULL; } ~Worker() { // 清理定时器和服务器 stopServer(); } public slots: void startServer() { if (server) { qDebug() << "Server is already running!"; return; // 防止重复启动服务器 } // 创建 QTcpServer 并设置信号与槽 server = new QTcpServer(this); connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection())); // 尝试监听端口 if (!server->listen(QHostAddress::Any, TEST_PORT)) { std::cout << "Server failed to start!" << std::endl; qDebug() << "Server failed to start!"; emit serverError(); return; } else { std::cout << "Server is running on port " << TEST_PORT << std::endl; qDebug() << QString("Server is running on port %1").arg(TEST_PORT); } // 创建并启动定时器 timer = new QTimer(this); connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask())); timer->start(60000); // 每60秒触发一次 std::cout << "Timer started, event loop running in thread: " << QThread::currentThreadId() << std::endl; qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId(); } void stopServer() { // 停止服务器并清理资源 if (server) { server->close(); delete server; server = NULL; qDebug() << "Server stopped."; } // 停止定时器 if (timer) { timer->stop(); delete timer; timer = NULL; qDebug() << "Timer stopped."; } } void setTestNum(int num) { QMutexLocker locker(&mutex); TEST_NUM = num; } private slots: void doPeriodicTask() { QMutexLocker locker(&mutex); std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl; qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM; if (TEST_NUM != 0) { qDebug() << "Executing rocketmq_test_300()"; std::cout << "Executing rocketmq_test_300()\n"; rocketmq_test_300(TEST_NUM, g_front_seg_index); } } void onNewConnection() { if (!server) return; QTcpSocket *clientSocket = server->nextPendingConnection(); qDebug() << "New connection established!"; std::cout << "New connection established!\n"; connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead())); connect(clientSocket, SIGNAL(disconnected()), clientSocket, SLOT(deleteLater())); // 自动清理连接 // 向客户端发送提示符 if (clientSocket) { std::cout << "clientSocket OK\n"; clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\n> "); clientSocket->flush(); // 确保消息立即发送 } } void onReadyRead() { QTcpSocket *clientSocket = qobject_cast(sender()); if (!clientSocket) { std::cout << "Invalid socket\n"; return; } QByteArray data = clientSocket->readAll(); QString command = QString::fromUtf8(data).trimmed(); // 获取输入的命令并去除前后空格 qDebug() << "Received command:" << command; std::cout << "Received command: " << command.toStdString() << "\n"; // 存储历史命令 if (!command.isEmpty() && (commandHistory.isEmpty() || command != commandHistory.last())) { commandHistory.append(command); historyIndex = commandHistory.size(); // 新命令的索引 } // 向客户端发送“指令已输入”反馈 clientSocket->write("Received command\n> "); clientSocket->flush(); clientSocket->write("test_shell> "); clientSocket->flush(); // 确保提示符立即显示 // 处理 help 命令 if (command == "help") { QString helpText = "Available commands:\n"; helpText += "TEST_NUM= - Set the TEST_NUM\n"; helpText += "rc - Execute rocketmq_test_rc\n"; helpText += "rt - Execute rocketmq_test_rt\n"; helpText += "ud - Execute rocketmq_test_ud\n"; helpText += "set - Execute rocketmq_test_set\n"; helpText += "ledger - Execute ledger with optional terminal_id\n"; helpText += "value - Execute value print with valuename : iedcount frontfun frontindex\n"; helpText += "exit - Exit the shell\n"; helpText += "help - Show this help message\n"; clientSocket->write(helpText.toUtf8()); clientSocket->flush(); } // 处理 TEST_NUM else if (command.startsWith("TEST_NUM=")) { bool ok; int num = command.mid(9).toInt(&ok); // 获取等号后面的数字部分 if (ok) { setTestNum(num); // 更新 TEST_NUM clientSocket->write("TEST_NUM updated\n> "); std::cout << "TEST_NUM updated\n"; } else { clientSocket->write("Invalid number\n> "); std::cout << "Invalid number\n"; } } // 处理 rc 命令 else if (command.startsWith("rc")) { qDebug() << "Executing rocketmq_test_rc()"; std::cout << "Executing rocketmq_test_rc()\n"; rocketmq_test_rc(); // 调用 rc 函数 clientSocket->write("Executed rocketmq_test_rc\n> "); } // 处理 rt 命令 else if (command.startsWith("rt")) { qDebug() << "Executing rocketmq_test_rt()"; std::cout << "Executing rocketmq_test_rt()\n"; rocketmq_test_rt(); // 调用 rt 函数 clientSocket->write("Executed rocketmq_test_rt\n> "); } // 处理 ud 命令 else if (command.startsWith("ud")) { qDebug() << "Executing rocketmq_test_ud()"; std::cout << "Executing rocketmq_test_ud()\n"; rocketmq_test_ud(); // 调用 ud 函数 clientSocket->write("Executed rocketmq_test_ud\n> "); } else if (command.startsWith("set")) { qDebug() << "Executing rocketmq_test_set()"; std::cout << "Executing rocketmq_test_set()\n"; rocketmq_test_set(); // 调用 set 函数 clientSocket->write("Executed rocketmq_test_set\n> "); } else if (command.startsWith("ledger")) { qDebug() << "Executing ledger()"; std::cout << "Executing ledger()\n"; // 提取参数 QStringList parts = command.split(" "); // 根据空格分割命令 if (parts.size() > 1) { // 如果命令中包含参数(即 id号) QString terminalId = parts[1]; std::cout << "Calling ledger with terminal_id: " << terminalId.toStdString() << std::endl; // 修改:调用 ledger 时传递 clientSocket 作为输出设备 ledger(terminalId.toStdString().c_str(), clientSocket); // 调用带参数的 ledger,传递 clientSocket clientSocket->write("Executed ledger with terminal_id\n> "); } else { std::cout << "Calling ledger without parameters\n"; // 修改:调用无参数的 ledger,传递 clientSocket ledger(NULL, clientSocket); // 调用无参数的 ledger,传递 clientSocket clientSocket->write("Executed ledger without parameters\n> "); } } else if (command.startsWith("value")) { std::cout << "Executing value()" << std::endl; // 提取命令中的参数,获取变量名 QStringList parts = command.split(" "); // 根据空格分割命令 if (parts.size() > 1) { // 如果命令中包含参数 QString variableName = parts[1]; std::cout << "Calling value() with variable name: " << variableName.toStdString() << std::endl; // 调用 value() 输出变量值 value_print(variableName.toStdString().c_str(), clientSocket); // 假设 value() 函数可以根据变量名输出值 clientSocket->write("Executed value with variable name: " + variableName.toUtf8() + "\n> "); } else { std::cout << "Calling value without parameters" << std::endl; // 提示用户需要提供变量名 clientSocket->write("Please provide a variable name\n> "); } } // 处理历史命令 else if (command == "up" || command == "down") { // 上下箭头命令处理 if (command == "up") { // 上箭头:选择前一个历史命令 if (historyIndex > 0) { historyIndex--; } } else if (command == "down") { // 下箭头:选择下一个历史命令 if (historyIndex < commandHistory.size() - 1) { historyIndex++; } } // 如果有历史命令,返回历史命令 if (historyIndex >= 0 && historyIndex < commandHistory.size()) { clientSocket->write(commandHistory[historyIndex].toUtf8()); } else { clientSocket->write("No history available\n> "); } clientSocket->flush(); } // 处理 exit 命令 else if (command == "exit") { // 发送退出信息并关闭客户端连接 clientSocket->write("Goodbye! Exiting shell...\n"); clientSocket->flush(); clientSocket->disconnectFromHost(); // 关闭连接 clientSocket->waitForDisconnected(); // 确保连接断开 } // 处理未知命令 else { clientSocket->write("Unknown command\n> "); } clientSocket->flush(); } signals: void serverError(); private: QList commandHistory; // 存储历史命令 int historyIndex = -1; // 当前历史命令索引 QTcpServer *server; QTimer *timer; int TEST_NUM; QMutex mutex; }; //lnk20241213 class mqconsumerThread : public QThread { protected: void run(); }; class OnTimerThread : public QThread//定时线程 { protected: void run(); }; //WW 2023-08-22 end ///////////////////////////////////////////////////////////////////////// extern "C" { #endif //lnk20250106添加台账结构 typedef struct terminal terminal; typedef struct monitor monitor; struct monitor // 监测点台账 { char monitor_id[64]; char terminal_code[64]; char monitor_name[64]; char logical_device_seq[64]; char voltage_level[64]; char terminal_connect[64]; char timestamp[64]; char status[255]; }; struct terminal // 终端台账 { char terminal_id[64]; char terminal_code[64]; char org_name[64]; char maint_name[64]; char station_name[64]; char tmnl_factory[64]; char tmnl_status[64]; char dev_type[64]; char dev_key[255]; char dev_series[255]; char processNo[64]; //lnk20250210进程号 char addr_str[64]; char port[64]; char timestamp[64]; monitor line[10]; // 最多 10 个监测点 }; #ifdef __cplusplus } #endif ///////////////////////////////////////////////////////////////////////////// #endif //SAVE2DB_8ue3hy0923r_H