Files
microser/json/save2json.h
2025-03-03 18:20:00 +08:00

578 lines
18 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: save2json.h,v $
* @brief: $IEC 61850 Protocol
*
* @version: $Revision: 1.5 $
* @date: $Date: 2018/12/23 12:39:52 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $
*
*/
#ifndef SAVE2DB_8ue3hy0923r_H
#define SAVE2DB_8ue3hy0923r_H
#ifdef __cplusplus
extern "C" {
#endif
extern int g_front_seg_index;
#ifdef __cplusplus
}
#endif
#ifdef __cplusplus
#include <string>
#include <vector>
#include <map>
#include <list>
#include <QThread>
//lnk20250106
#include "../include/rocketmq/SimpleProducer.h"
//日志功能
#include "../cfg_parse/custom_printf.h"//lnk20250225
#include <csignal>
#include <unistd.h>
#include <QTcpServer>
#include <QTcpSocket>
#include <QMutex>
#include <QMutexLocker>
#include <QDebug>
#include <QTimer>
#include <QCoreApplication>
#include <QStringList>
#include <iostream>
extern int G_TEST_NUM;
extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL);
extern void value_print(const char *variableName, QTcpSocket *clientSocket);
extern int TEST_PORT;
//////////////////////////////////////////////////////////////////////////////
//struct json_pair_info
//{
// string topic; //<Topic name="HISDATA" desc="历史数据"> "RTDATA" 实时数据 "RTDATASOE"实时SOE事件 等
// string data_type; //数据类型01/04:稳态/召测稳态02/05:闪变/召测闪变03/06:暂态/召测暂态
// string item; //<Item name="V" desc="电压" type="4" > 或"I" "PQ" 等
// string sequence; //"A" 或 "B" 或 "C" 或 "T"
// string name; //json的key
// int type; //6-值索引、9-实时SOE事件
// string mms_ref; //mms地址字符串
// float coeff; //Coefficient:数据系数
// unsigned short PltFlag; //0xffff
//};
/////////////////////////////////////////////////////////////////////////////
class KafkaSendThread : public QThread
{
// Q_OBJECT
//public:
protected:
void run();
};
//WW 2023-08-22 增加数据库线程和WebSocket线程
class SQLExcuteThread : public QThread
{
protected:
void run();
};
class WebSocketThread : public QThread
{
protected:
void run();
};
//lnk20241029
class WebhttpThread : public QThread
{
protected:
void run();
};
class httpThread : public QThread
{
protected:
void run();
};
//lnk20241202
/*class mqtestThread : public QThread
{
protected:
void run();
};*/
//lnk20250106
extern bool showinshellflag;
class Worker : public QObject
{
Q_OBJECT
public:
Worker(QObject *parent = NULL)
: QObject(parent),
server(NULL),
TEST_NUM(G_TEST_NUM),
timer(NULL),
historyIndex(-1),
stopViewLog(false),
activeClient(NULL)
{
}
~Worker() {
// 清理定时器和服务器
stopServer();
}
void handleViewLogCommand(const QString& command, QTcpSocket* clientSocket);
public slots:
void startServer() {
if (server) {
qDebug() << "Server is already running!";
return; // 防止重复启动服务器
}
// 创建 QTcpServer 并设置信号与槽
server = new QTcpServer(this);
connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
// 尝试监听端口
if (!server->listen(QHostAddress::Any, TEST_PORT)) {
std::cout << "Server failed to start!" << std::endl;
qDebug() << "Server failed to start!";
emit serverError();
return;
} else {
std::cout << "Server is running on port " << TEST_PORT << std::endl;
qDebug() << QString("Server is running on port %1").arg(TEST_PORT);
}
// 创建并启动定时器
timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask()));
timer->start(60000); // 每60秒触发一次
std::cout << "Timer started, event loop running in thread: " << QThread::currentThreadId() << std::endl;
qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId();
}
void stopServer() {
// 停止服务器并清理资源
if (server) {
server->close();
delete server;
server = NULL;
qDebug() << "Server stopped.";
}
// 停止定时器
if (timer) {
timer->stop();
delete timer;
timer = NULL;
qDebug() << "Timer stopped.";
}
}
void setTestNum(int num) {
QMutexLocker locker(&mutex);
TEST_NUM = num;
}
private slots:
void doPeriodicTask() {
QMutexLocker locker(&mutex);
std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl;
qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM;
if (TEST_NUM != 0) {
qDebug() << "Executing rocketmq_test_300()";
std::cout << "Executing rocketmq_test_300()\n";
rocketmq_test_300(TEST_NUM, g_front_seg_index);
}
}
void onNewConnection() {
if (!server) return;
QTcpSocket *clientSocket = server->nextPendingConnection();
qDebug() << "New connection established!";
std::cout << "New connection established!\n";
// 当有数据可读时,进入 onReadyRead()
connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
// 当客户端断开时,自动清理 socket
connect(clientSocket, SIGNAL(disconnected()), clientSocket, SLOT(deleteLater()));
// 向客户端发送提示符
if (clientSocket) {
std::cout << "clientSocket OK\n";
clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\n> ");
clientSocket->flush(); // 确保消息立即发送
}
}
/**
* @brief 逐字节处理Telnet输入识别方向键(上下)、退格、回车等
* 当按下回车时,将当前行内容视为一条命令交给 processCommand 解析。
*/
void onReadyRead() {
QTcpSocket *clientSocket = qobject_cast<QTcpSocket *>(sender());
if (!clientSocket) {
std::cout << "Invalid socket\n";
return;
}
QByteArray data = clientSocket->readAll();
// 逐字节处理输入
for (int i = 0; i < data.size(); ++i) {
char c = data[i];
if (c == 'q') { // ? 用户输入 `q` 退出 `viewlog`
std::cout << "Received 'q' from shell socket! Exiting viewlog...\n";
if (activeClient == clientSocket) {
stopViewLog = true; // ? 让 `viewlog` 退出
clientSocket->write("\nLog view stopped. Returning to shell.\n> ");
clientSocket->flush();
}
return; // ? 立即返回,避免继续处理其他字符
}
switch (c) {
case '\r':
case '\n':
// ? 处理回车,执行命令
if (!currentCommand.isEmpty()) {
if (commandHistory.isEmpty() || commandHistory.last() != currentCommand) {
commandHistory.append(currentCommand);
}
historyIndex = commandHistory.size(); // 指向最新一条之后
processCommand(currentCommand, clientSocket);
currentCommand.clear();
} else {
clientSocket->write("\n> ");
clientSocket->flush();
}
break;
case '\x1b':
// ? 处理方向键 (上箭头 `\x1b[A`,下箭头 `\x1b[B`)
if (i + 2 < data.size() && data[i+1] == '[') {
char arrow = data[i+2];
if (arrow == 'A') {
handleUpArrow(clientSocket); // **调用上箭头处理**
} else if (arrow == 'B') {
handleDownArrow(clientSocket); // **调用下箭头处理**
}
i += 2; // ? **跳过 ESC 序列 `\x1b[A` 或 `\x1b[B`**
continue;
}
break;
case '\x7f':
case '\x08':
// ? 处理退格键
if (!currentCommand.isEmpty()) {
currentCommand.chop(1);
clientSocket->write("\b \b");
clientSocket->flush();
}
break;
default:
// ? 普通字符,追加到 `currentCommand`
currentCommand.append(c);
clientSocket->write(&c, 1);
clientSocket->flush();
break;
}
}
}
signals:
void serverError();
private:
// ====================== 新增日志功能 =========================
bool stopViewLog; // ? 这里不需要 static针对每个 client
QTcpSocket* activeClient; // 记录当前正在 `viewlog` 的客户端
// --------------------
// 以下为新增的辅助函数
// --------------------
/**
* @brief 处理输入命令(完整的一行),即原先在 onReadyRead() 大量 if-else 的逻辑
*/
void processCommand(const QString &command, QTcpSocket *clientSocket) {
// 打印日志
qDebug() << "Received command:" << command;
std::cout << "Received command: " << command.toStdString() << "\n";
// 以下为原先 onReadyRead() 中的 if / else if 处理逻辑
// ------------------------------------------------------
if (command == "help") {
QString helpText = "Available commands:\n";
helpText += "TEST_NUM=<num> - Set the TEST_NUM\n";
helpText += "rc - Execute rocketmq_test_rc\n";
helpText += "rt - Execute rocketmq_test_rt\n";
helpText += "ud - Execute rocketmq_test_ud\n";
helpText += "set - Execute rocketmq_test_set\n";
helpText += "only - Execute rocketmq_test_only\n";
helpText += "log - Execute rocketmq_test_log\n";
helpText += "ledger <id> - Execute ledger with optional terminal_id\n";
helpText += "viewlog <level> - View logs (ERROR, WARN, NORMAL, DEBUG)\n";
helpText += "value <valuename> - Execute value print with valuename : iedcount frontfun frontindex remtable log\n";
helpText += "exit - Exit the shell\n";
helpText += "help - Show this help message\n";
clientSocket->write(helpText.toUtf8());
}
else if (command.startsWith("viewlog")) {
showinshellflag = true;
handleViewLogCommand(command, clientSocket);
}
// 设置多点模拟的个数
else if (command.startsWith("TEST_NUM=")) {
bool ok;
int num = command.mid(9).toInt(&ok); // 获取等号后面的数字部分
if (ok) {
setTestNum(num); // 更新 TEST_NUM
clientSocket->write("TEST_NUM updated\n");
std::cout << "TEST_NUM updated\n";
} else {
clientSocket->write("Invalid number\n");
std::cout << "Invalid number\n";
}
}
// 发送补招数据测试文本
else if (command.startsWith("rc")) {
qDebug() << "Executing rocketmq_test_rc()";
std::cout << "Executing rocketmq_test_rc()\n";
rocketmq_test_rc(); // 调用 rc 函数
clientSocket->write("Executed rocketmq_test_rc\n");
}
// 发送实时数据测试文本
else if (command.startsWith("rt")) {
qDebug() << "Executing rocketmq_test_rt()";
std::cout << "Executing rocketmq_test_rt()\n";
rocketmq_test_rt(); // 调用 rt 函数
clientSocket->write("Executed rocketmq_test_rt\n");
}
// 发送台账更新测试文本
else if (command.startsWith("ud")) {
qDebug() << "Executing rocketmq_test_ud()";
std::cout << "Executing rocketmq_test_ud()\n";
rocketmq_test_ud(); // 调用 ud 函数
clientSocket->write("Executed rocketmq_test_ud\n");
}
// 发送进程控制测试文本
else if (command.startsWith("set")) {
qDebug() << "Executing rocketmq_test_set()";
std::cout << "Executing rocketmq_test_set()\n";
rocketmq_test_set(); // 调用 set 函数
clientSocket->write("Executed rocketmq_test_set\n");
}
// 发送单连进程测试文本
else if (command.startsWith("only")) {
qDebug() << "Executing rocketmq_test_only()";
std::cout << "Executing rocketmq_test_only()\n";
rocketmq_test_only(); // 调用 rocketmq_test_only 函数
clientSocket->write("Executed rocketmq_test_only\n");
}
// 发送实时日志测试文本
else if (command.startsWith("log")) {
qDebug() << "Executing rocketmq_test_log()";
std::cout << "Executing rocketmq_test_log()\n";
rocketmq_test_log(); // 调用 log 函数
clientSocket->write("Executed rocketmq_test_log\n");
}
// 查看当前进程的台账
else if (command.startsWith("ledger")) {
qDebug() << "Executing ledger()";
std::cout << "Executing ledger()\n";
// 提取参数
QStringList parts = command.split(" "); // 根据空格分割命令
if (parts.size() > 1) { // 如果命令中包含参数(即 id号
QString terminalId = parts[1];
std::cout << "Calling ledger with terminal_id: " << terminalId.toStdString() << std::endl;
ledger(terminalId.toStdString().c_str(), clientSocket); // 带参数调用 ledger
clientSocket->write("Executed ledger with terminal_id\n");
} else {
std::cout << "Calling ledger without parameters\n";
ledger(NULL, clientSocket); // 无参数调用 ledger
clientSocket->write("Executed ledger without parameters\n");
}
}
// 查看当前进程的指定值
else if (command.startsWith("value")) {
std::cout << "Executing value()" << std::endl;
// 提取命令中的参数,获取变量名
QStringList parts = command.split(" ");
if (parts.size() > 1) {
QString variableName = parts[1];
std::cout << "Calling value() with variable name: " << variableName.toStdString() << std::endl;
// 调用 value_print() 输出变量值
value_print(variableName.toStdString().c_str(), clientSocket);
clientSocket->write("Executed value with variable name: " + variableName.toUtf8() + "\n");
} else {
std::cout << "Calling value without parameters" << std::endl;
clientSocket->write("Please provide a variable name\n");
}
}
// 处理 exit 命令
else if (command == "exit") {
// 发送退出信息并关闭客户端连接
clientSocket->write("Goodbye! Exiting shell...\n");
clientSocket->flush();
clientSocket->disconnectFromHost(); // 关闭连接
clientSocket->waitForDisconnected(); // 确保连接断开
return;
}
// 未知命令
else {
clientSocket->write("Unknown command\n");
}
// 处理完命令后,回显一个新行和提示符
clientSocket->write("> ");
clientSocket->flush();
}
/**
* @brief 处理上箭头,从历史记录中调出上一条命令
*/
void handleUpArrow(QTcpSocket *clientSocket) {
if (!commandHistory.isEmpty() && historyIndex > 0) {
historyIndex--;
currentCommand = commandHistory[historyIndex];
// **清空当前行**
clientSocket->write("\r"); // 移动光标到行首
clientSocket->write(" "); // 用空格覆盖
clientSocket->write("\r> "); // 重新打印提示符
clientSocket->write(currentCommand.toUtf8()); // 回显历史命令
clientSocket->flush();
}
}
/**
* @brief 处理下箭头,从历史记录中调出下一条命令
*/
void handleDownArrow(QTcpSocket *clientSocket) {
if (!commandHistory.isEmpty() && historyIndex < commandHistory.size() - 1) {
historyIndex++;
currentCommand = commandHistory[historyIndex];
// **清空当前行**
clientSocket->write("\r");
clientSocket->write(" ");
clientSocket->write("\r> ");
clientSocket->write(currentCommand.toUtf8()); // 回显历史命令
clientSocket->flush();
}
else if (historyIndex == commandHistory.size() - 1) {
// **如果已经是最后一条,再按下箭头,则清空当前输入**
historyIndex = commandHistory.size();
currentCommand.clear();
clientSocket->write("\r");
clientSocket->write(" ");
clientSocket->write("\r> ");
clientSocket->flush();
}
}
private:
QTcpServer *server;
QTimer *timer;
int TEST_NUM;
QMutex mutex;
// 历史命令相关
QList<QString> commandHistory; // 存储历史命令
int historyIndex; // 当前历史命令索引
QString currentCommand; // 当前正在输入的命令
};
//lnk20241213
class mqconsumerThread : public QThread
{
protected:
void run();
};
class OnTimerThread : public QThread//定时线程
{
protected:
void run();
};
//WW 2023-08-22 end
/////////////////////////////////////////////////////////////////////////
extern "C" {
#endif
//lnk20250106添加台账结构
typedef struct terminal terminal;
typedef struct monitor monitor;
struct monitor // 监测点台账
{
char monitor_id[64];
char terminal_code[64];
char monitor_name[64];
char logical_device_seq[64];
char voltage_level[64];
char terminal_connect[64];
char timestamp[64];
char status[255];
};
struct terminal // 终端台账
{
char terminal_id[64];
char terminal_code[64];
char org_name[64];
char maint_name[64];
char station_name[64];
char tmnl_factory[64];
char tmnl_status[64];
char dev_type[64];
char dev_key[255];
char dev_series[255];
char processNo[64]; //lnk20250210进程号
char addr_str[64];
char port[64];
char timestamp[64];
monitor line[10]; // 最多 10 个监测点
};
#ifdef __cplusplus
}
#endif
/////////////////////////////////////////////////////////////////////////////
#endif //SAVE2DB_8ue3hy0923r_H