/** * @file: $RCSfile: save2json.h,v $ * @brief: $IEC 61850 Protocol * * @version: $Revision: 1.5 $ * @date: $Date: 2018/12/23 12:39:52 $ * @author: $Author: lizhongming $ * @state: $State: Exp $ * * @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $ * */ #ifndef SAVE2DB_8ue3hy0923r_H #define SAVE2DB_8ue3hy0923r_H #ifdef __cplusplus extern "C" { #endif extern int g_front_seg_index; #ifdef __cplusplus } #endif #ifdef __cplusplus #include #include #include #include #include //lnk20250106 #include "../include/rocketmq/SimpleProducer.h" #include #include #include #include #include #include #include #include #include extern int G_TEST_NUM; extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL); extern int TEST_PORT; ////////////////////////////////////////////////////////////////////////////// //struct json_pair_info //{ // string topic; // "RTDATA" 实时数据 "RTDATASOE"实时SOE事件 等 // string data_type; //数据类型,01/04:稳态/召测稳态,02/05:闪变/召测闪变,03/06:暂态/召测暂态 // string item; // 或"I" "PQ" 等 // string sequence; //"A" 或 "B" 或 "C" 或 "T" // string name; //json的key // int type; //6-值索引、9-实时SOE事件 // string mms_ref; //mms地址字符串 // float coeff; //Coefficient:数据系数 // unsigned short PltFlag; //0xffff //}; ///////////////////////////////////////////////////////////////////////////// class KafkaSendThread : public QThread { // Q_OBJECT //public: protected: void run(); }; //WW 2023-08-22 增加数据库线程和WebSocket线程 class SQLExcuteThread : public QThread { protected: void run(); }; class WebSocketThread : public QThread { protected: void run(); }; //lnk20241029 class WebhttpThread : public QThread { protected: void run(); }; class httpThread : public QThread { protected: void run(); }; //lnk20241202 /*class mqtestThread : public QThread { protected: void run(); };*/ //lnk20250106 //使用telnet 127.0.0.1 12345进入测试 class Worker : public QObject { Q_OBJECT public: Worker(QObject *parent = NULL) : QObject(parent), server(NULL), TEST_NUM(G_TEST_NUM) {} ~Worker() { if (server) { server->close(); delete server; } } public slots: void startServer() { server = new QTcpServer(); connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection())); if (!server->listen(QHostAddress::Any, TEST_PORT)) { std::cout << "Server failed to start!" << std::endl; qDebug() << "Server failed to start!"; delete server; server = NULL; emit serverError(); return; } else { std::cout << "Server is running on port " << TEST_PORT << std::endl; qDebug() << QString("Server is running on port %1").arg(TEST_PORT); } // 创建并启动定时器 timer = new QTimer(this); connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask())); timer->start(60000); // 每60秒触发一次 std::cout << "Timer started, event loop running in thread: " << QThread::currentThreadId() << std::endl; qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId(); } void setTestNum(int num) { QMutexLocker locker(&mutex); TEST_NUM = num; } private slots: void doPeriodicTask() { QMutexLocker locker(&mutex); std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl; qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM; if (TEST_NUM != 0) { qDebug() << "Executing rocketmq_test_300()"; std::cout << "Executing rocketmq_test_300()\n"; rocketmq_test_300(TEST_NUM, g_front_seg_index); } } void onNewConnection() { if (!server) return; QTcpSocket *clientSocket = server->nextPendingConnection(); qDebug() << "New connection established!"; std::cout << "New connection established!\n"; connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead())); // 向客户端发送提示符 if (clientSocket) { std::cout << "clientSocket OK\n"; clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\n> "); clientSocket->flush(); // 确保消息立即发送 } } void onReadyRead() { QTcpSocket *clientSocket = qobject_cast(sender()); std::cout << "onReadyRead\n"; if (!clientSocket) { std::cout << "Invalid socket\n"; return; } std::cout << "read all\n"; QByteArray data = clientSocket->readAll(); QString command = QString::fromUtf8(data).trimmed(); // 获取输入的命令并去除前后空格 qDebug() << "Received command:" << command; std::cout << "Received command: " << command.toStdString() << "\n"; // 向客户端发送“指令已输入”反馈 clientSocket->write("Received command\n> "); clientSocket->flush(); // 重新显示提示符 clientSocket->write("test_shell> "); clientSocket->flush(); // 确保提示符立即显示 // 处理 TEST_NUM if (command.startsWith("TEST_NUM=")) { bool ok; int num = command.mid(9).toInt(&ok); // 获取等号后面的数字部分 if (ok) { setTestNum(num); // 更新 TEST_NUM clientSocket->write("TEST_NUM updated\n> "); std::cout << "TEST_NUM updated\n"; } else { clientSocket->write("Invalid number\n> "); std::cout << "Invalid number\n"; } } // 处理 rc 命令 else if (command.startsWith("rc")) { qDebug() << "Executing rocketmq_test_rc()"; std::cout << "Executing rocketmq_test_rc()\n"; rocketmq_test_rc(); // 调用 rc 函数 clientSocket->write("Executed rocketmq_test_rc\n> "); } // 处理 rt 命令 else if (command.startsWith("rt")) { qDebug() << "Executing rocketmq_test_rt()"; std::cout << "Executing rocketmq_test_rt()\n"; rocketmq_test_rt(); // 调用 rt 函数 clientSocket->write("Executed rocketmq_test_rt\n> "); } // 处理 ud 命令 else if (command.startsWith("ud")) { qDebug() << "Executing rocketmq_test_ud()"; std::cout << "Executing rocketmq_test_ud()\n"; rocketmq_test_ud(); // 调用 ud 函数 clientSocket->write("Executed rocketmq_test_ud\n> "); } else if (command.startsWith("set")) { qDebug() << "Executing rocketmq_test_set()"; std::cout << "Executing rocketmq_test_set()\n"; rocketmq_test_set(); // 调用 set 函数 clientSocket->write("Executed rocketmq_test_set\n> "); } else if (command.startsWith("ledger")) { qDebug() << "Executing ledger()"; std::cout << "Executing ledger()\n"; // 提取参数 QStringList parts = command.split(" "); // 根据空格分割命令 if (parts.size() > 1) { // 如果命令中包含参数(即 id号) QString terminalId = parts[1]; std::cout << "Calling ledger with terminal_id: " << terminalId.toStdString() << std::endl; // 修改:调用 ledger 时传递 clientSocket 作为输出设备 ledger(terminalId.toStdString().c_str(), clientSocket); // 调用带参数的 ledger,传递 clientSocket clientSocket->write("Executed ledger with terminal_id\n> "); } else { std::cout << "Calling ledger without parameters\n"; // 修改:调用无参数的 ledger,传递 clientSocket ledger(NULL, clientSocket); // 调用无参数的 ledger,传递 clientSocket clientSocket->write("Executed ledger without parameters\n> "); } } // 处理未知命令 else { clientSocket->write("Unknown command\n> "); } clientSocket->flush(); } signals: void serverError(); private: QTcpServer *server; QTimer *timer; int TEST_NUM; QMutex mutex; }; //lnk20241213 class mqconsumerThread : public QThread { protected: void run(); }; class OnTimerThread : public QThread//定时线程 { protected: void run(); }; //WW 2023-08-22 end ///////////////////////////////////////////////////////////////////////// extern "C" { #endif //lnk20250106添加台账结构 typedef struct terminal terminal; typedef struct monitor monitor; struct monitor // 监测点台账 { char monitor_id[64]; char terminal_code[64]; char monitor_name[64]; char logical_device_seq[64]; char voltage_level[64]; char terminal_connect[64]; char timestamp[64]; char status[255]; }; struct terminal // 终端台账 { char terminal_id[64]; char terminal_code[64]; char org_name[64]; char maint_name[64]; char station_name[64]; char tmnl_factory[64]; char tmnl_status[64]; char dev_type[64]; char dev_key[255]; char dev_series[255]; char addr_str[64]; char port[64]; char timestamp[64]; monitor line[10]; // 最多 10 个监测点 }; #ifdef __cplusplus } #endif ///////////////////////////////////////////////////////////////////////////// #endif //SAVE2DB_8ue3hy0923r_H