Files
microser/json/save2json.h
2025-03-07 18:27:03 +08:00

693 lines
21 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: save2json.h,v $
* @brief: $IEC 61850 Protocol
*
* @version: $Revision: 1.5 $
* @date: $Date: 2018/12/23 12:39:52 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $
*
*/
#ifndef SAVE2DB_8ue3hy0923r_H
#define SAVE2DB_8ue3hy0923r_H
#ifdef __cplusplus
extern "C" {
#endif
extern int g_front_seg_index;
#ifdef __cplusplus
}
#endif
#ifdef __cplusplus
#include <string>
#include <vector>
#include <map>
#include <list>
#include "../mms/db_interface.h"
#include <QThread>
//lnk20250106
#include "../include/rocketmq/SimpleProducer.h"
//日志功能
#include "../cfg_parse/custom_printf.h"//lnk20250225
#include <csignal>
#include <unistd.h>
#include <QTcpServer>
#include <QTcpSocket>
#include <QMutex>
#include <QMutexLocker>
#include <QDebug>
#include <QTimer>
#include <QCoreApplication>
#include <QStringList>
#include <iostream>
extern int G_TEST_NUM;
extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL);
extern void value_print(const char *variableName, QTcpSocket *clientSocket);
extern int TEST_PORT;
extern void redirectErrorOutput(bool enable);
extern void redirectWarnOutput(bool enable);
extern void redirectNormalOutput(bool enable);
extern void redirectDebugOutput(bool enable);
//////////////////////////////////////////////////////////////////////////////
//struct json_pair_info
//{
// string topic; //<Topic name="HISDATA" desc="历史数据"> "RTDATA" 实时数据 "RTDATASOE"实时SOE事件 等
// string data_type; //数据类型01/04:稳态/召测稳态02/05:闪变/召测闪变03/06:暂态/召测暂态
// string item; //<Item name="V" desc="电压" type="4" > 或"I" "PQ" 等
// string sequence; //"A" 或 "B" 或 "C" 或 "T"
// string name; //json的key
// int type; //6-值索引、9-实时SOE事件
// string mms_ref; //mms地址字符串
// float coeff; //Coefficient:数据系数
// unsigned short PltFlag; //0xffff
//};
/////////////////////////////////////////////////////////////////////////////
class KafkaSendThread : public QThread
{
// Q_OBJECT
//public:
protected:
void run();
};
//WW 2023-08-22 增加数据库线程和WebSocket线程
class SQLExcuteThread : public QThread
{
protected:
void run();
};
class WebSocketThread : public QThread
{
protected:
void run();
};
//lnk20241029
class WebhttpThread : public QThread
{
protected:
void run();
};
class httpThread : public QThread
{
protected:
void run();
};
//lnk20241202
/*class mqtestThread : public QThread
{
protected:
void run();
};*/
//lnk20250106
extern bool showinshellflag;
#ifdef __cplusplus
extern "C" {
#endif
void doMonitorTaskmain(void);
#ifdef __cplusplus
}
#endif
// ====================== Telnet 常量定义 ======================
// Telnet 命令字节:
#define IAC 255 // Interpret As Command
#define DONT 254
#define TELDO 253
#define WONT 252
#define WILL 251
// Telnet 选项常量:
#define TELOPT_ECHO 1 // 回显
#define TELOPT_SUPPRESS_GO_AHEAD 3 // SGA
#define TELOPT_LINEMODE 34 // 行模式
/**
* @brief Worker类包含启动服务器、处理Telnet交互等逻辑
*/
class Worker : public QObject
{
Q_OBJECT
public:
Worker(QObject *parent = NULL)
: QObject(parent),
server(NULL),
TEST_NUM(G_TEST_NUM),
timer(NULL),
historyIndex(-1),
stopViewLog(true),
g_stopTelnetTest(true),
activeClient(NULL)
{
}
~Worker() {
// 清理定时器和服务器
stopServer();
}
void handleViewLogCommand(const QString& command, QTcpSocket* clientSocket);
int init_ping_telnet(QTcpSocket* clientSocket, int& ip_count, int& telnet_count);
void telnetetst(QTcpSocket* clientSocket);
public slots:
/**
* @brief 启动服务器
*/
void startServer() {
if (server) {
qDebug() << "Server is already running!";
return; // 防止重复启动服务器
}
// 创建 QTcpServer 并设置信号与槽
server = new QTcpServer(this);
connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
// 尝试监听端口
if (!server->listen(QHostAddress::Any, TEST_PORT)) {
std::cout << "Server failed to start!" << std::endl;
qDebug() << "Server failed to start!";
emit serverError();
return;
} else {
std::cout << "Server is running on port " << TEST_PORT << std::endl;
qDebug() << QString("Server is running on port %1").arg(TEST_PORT);
}
// 创建并启动定时器
timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask()));
timer->start(60000); // 每60秒触发一次
// 开启另一个周期函数用来替换主线程的监控
QTimer *monitorTimer = new QTimer(this);
connect(monitorTimer, SIGNAL(timeout()), this, SLOT(doMonitorTask()));
monitorTimer->start(1000); // 每1秒触发一次
std::cout << "Timer started, event loop running in thread: "
<< QThread::currentThreadId() << std::endl;
qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId();
}
/**
* @brief 停止服务器
*/
void stopServer() {
// 停止服务器并清理资源
if (server) {
server->close();
delete server;
server = NULL;
qDebug() << "Server stopped.";
}
// 停止定时器
if (timer) {
timer->stop();
delete timer;
timer = NULL;
qDebug() << "Timer stopped.";
}
}
/**
* @brief 设置TEST_NUM
*/
void setTestNum(int num) {
QMutexLocker locker(&mutex);
TEST_NUM = num;
}
void setTestlog(bool flag) {
redirectErrorOutput(flag);
redirectWarnOutput(flag);
redirectNormalOutput(flag);
redirectDebugOutput(flag);
}
private slots:
/**
* @brief 定时任务
*/
void doPeriodicTask() {
QMutexLocker locker(&mutex);
std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl;
qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM;
if (TEST_NUM != 0) {
qDebug() << "Executing rocketmq_test_300()";
std::cout << "Executing rocketmq_test_300()\n";
rocketmq_test_300(TEST_NUM, g_front_seg_index);
}
}
/**
* @brief 监控任务
*/
void doMonitorTask() {
doMonitorTaskmain();
}
/**
* @brief 当有新客户端连接时处理
*/
void onNewConnection() {
if (!server) return;
QTcpSocket *clientSocket = server->nextPendingConnection();
qDebug() << "New connection established!";
std::cout << "New connection established!\n";
// 绑定 readyRead / disconnected
connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
connect(clientSocket, SIGNAL(disconnected()), clientSocket, SLOT(deleteLater()));
// 发送 Telnet 协商
sendTelnetNegotiation(clientSocket);
// 发送欢迎信息和提示符
if (clientSocket) {
std::cout << "clientSocket OK\n";
clientSocket->write("\r\x1B[K");
clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\r\n");
printPrompt(clientSocket); // 统一打印提示符
}
}
/**
* @brief 处理客户端发送的Telnet数据
*/
void onReadyRead() {
QTcpSocket *clientSocket = qobject_cast<QTcpSocket *>(sender());
if (!clientSocket) {
std::cout << "Invalid socket\n";
return;
}
QByteArray data = clientSocket->readAll();
for (int i = 0; i < data.size(); ++i) {
unsigned char c = static_cast<unsigned char>(data[i]);
// 如果检测到 IAC(255),说明是 Telnet 协商指令
if (c == IAC) {
// 简单跳过 TELDO/DONT/WILL/WONT + option
if (i + 1 < data.size()) {
unsigned char cmd = static_cast<unsigned char>(data[i+1]);
if (cmd == TELDO || cmd == DONT || cmd == WILL || cmd == WONT) {
i += 2; // 跳过这2字节
} else {
// 遇到其它情况(比如IAC SB),此处仅简单跳过一个字节
i += 1;
}
}
continue;
}
// 1) 处理 '`' 退出 viewlog 和ping
if (c == '`') {
std::cout << "Received '`' from shell socket! Exiting viewlog...\n";
if (activeClient == clientSocket) {
stopViewLog = true;
g_stopTelnetTest = true;
clientSocket->write("\r\x1B[K");
clientSocket->write("\r\nLog view stopped. Returning to shell.\r\n");
printPrompt(clientSocket);
}
return;
}
// 2) 回车换行:执行命令
if (c == '\r' || c == '\n') {
if (!currentCommand.isEmpty()) {
// 加到历史
if (commandHistory.isEmpty() || commandHistory.last() != currentCommand) {
commandHistory.append(currentCommand);
}
historyIndex = commandHistory.size();
// 执行命令时,忽略前后空白
//QString trimmedCmd = currentCommand.trimmed();
currentCommand.remove(0, 1);
processCommand(currentCommand, clientSocket);
currentCommand.clear();
} else {
// 空行 => 仅打印新的提示符
printPrompt(clientSocket);
}
continue;
}
// 3) 方向键
if (c == '\x1b') {
if (i + 2 < data.size() && data[i+1] == '[') {
char arrow = data[i+2];
if (arrow == 'A') {
handleUpArrow(clientSocket);
} else if (arrow == 'B') {
handleDownArrow(clientSocket);
}
i += 2;
}
continue;
}
// 假设提示符固定为 "> "提示符长度为2
const int promptLength = 1;
// 4) 退格键
if (c == '\x7f' || c == '\b') {
// 仅当 currentCommand 的长度大于提示符长度时,允许删除字符
if (currentCommand.length() > promptLength) {
currentCommand.chop(1);
// 回显退格:用 "\b \b" 来擦除屏幕上最后一个字符
clientSocket->write("\b \b");
clientSocket->flush();
}
continue;
}
// 5) 普通字符
currentCommand.append(static_cast<char>(c));
clientSocket->write((const char*)&c, 1);
clientSocket->flush();
}
}
signals:
void serverError();
private:
// ========== Telnet 协商函数 ==========
void sendTelnetNegotiation(QTcpSocket *socket)
{
// 发送 WILL ECHO / WILL SUPPRESS-GO-AHEAD / DONT LINEMODE
static const unsigned char will_echo[3] = { IAC, WILL, TELOPT_ECHO };
static const unsigned char will_sga[3] = { IAC, WILL, TELOPT_SUPPRESS_GO_AHEAD };
static const unsigned char dont_linemode[3] = { IAC, DONT, TELOPT_LINEMODE };
socket->write(reinterpret_cast<const char*>(will_echo), 3);
socket->write(reinterpret_cast<const char*>(will_sga), 3);
socket->write(reinterpret_cast<const char*>(dont_linemode), 3);
socket->flush();
}
/**
* @brief 打印提示符:统一使用\r\n换行并且打印"> "于行首
*/
void printPrompt(QTcpSocket *clientSocket)
{
clientSocket->write("\n\r\x1B[K> ");
clientSocket->flush();
}
/**
* @brief 执行一条命令已被trimmed
*/
void processCommand(const QString &cmd, QTcpSocket *clientSocket) {
qDebug() << "Received command:" << cmd;
std::cout << "Received command: " << cmd.toStdString() << "\n";
// 命令解析
if (cmd == "help") {
QString helpText = "Available commands:\r\n";
helpText += "TEST_NUM=<num> - Set the TEST_NUM\r\n";
helpText += "LOG=<bool> - Set the LOG\r\n";
helpText += "telnettest - Set the telnettest\r\n";
helpText += "rc - Execute rocketmq_test_rc\r\n";
helpText += "rt - Execute rocketmq_test_rt\r\n";
helpText += "ud - Execute rocketmq_test_ud\r\n";
helpText += "set - Execute rocketmq_test_set\r\n";
helpText += "only - Execute rocketmq_test_only\r\n";
helpText += "log - Execute rocketmq_test_log\r\n";
helpText += "soe - Execute http_test_soe\r\n";
helpText += "qvvr - Execute http_test_qvvr\r\n";
helpText += "connect - Execute http_test_connect\r\n";
helpText += "ledger <id> - Execute ledger with optional terminal_id\r\n";
helpText += "viewlog <level> - View logs (ERROR, WARN, NORMAL, DEBUG)\r\n";
helpText += "value <valuename> - Execute value print with valuename : frontindex remtable iedcount frontfun log init\r\n";
helpText += "exit - Exit the shell\r\n";
helpText += "help - Show this help message\r\n";
clientSocket->write("\r\x1B[K");
clientSocket->write(helpText.toUtf8());
}
else if (cmd.startsWith("viewlog")) {
showinshellflag = true;
handleViewLogCommand(cmd, clientSocket);
}
else if (cmd.startsWith("TEST_NUM=")) {
bool ok;
int num = cmd.mid(9).toInt(&ok);
if (ok) {
setTestNum(num);
clientSocket->write("\r\x1B[K");
clientSocket->write("TEST_NUM updated\r\n");
} else {
clientSocket->write("\r\x1B[K");
clientSocket->write("Invalid number\r\n");
}
}
else if (cmd.startsWith("LOG=")) {
bool ok;
bool flag = cmd.mid(4).toInt(&ok);
if (ok) {
setTestlog(flag);
clientSocket->write("\r\x1B[K");
clientSocket->write("TEST_NUM updated\r\n");
} else {
clientSocket->write("\r\x1B[K");
clientSocket->write("Invalid number\r\n");
}
}
else if (cmd.startsWith("telnettest")) {
g_stopTelnetTest = false;
telnetetst(clientSocket);
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed telnettest warning!!! it woont stop until finish!!!\r\n");
}
else if (cmd.startsWith("rc")) {
rocketmq_test_rc();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed rocketmq_test_rc\r\n");
}
else if (cmd.startsWith("rt")) {
rocketmq_test_rt();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed rocketmq_test_rt\r\n");
}
else if (cmd.startsWith("ud")) {
rocketmq_test_ud();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed rocketmq_test_ud\r\n");
}
else if (cmd.startsWith("set")) {
rocketmq_test_set();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed rocketmq_test_set\r\n");
}
else if (cmd.startsWith("only")) {
rocketmq_test_only();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed rocketmq_test_only\r\n");
}
else if (cmd.startsWith("log")) {
rocketmq_test_log();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed rocketmq_test_log\r\n");
}
else if (cmd.startsWith("soe")) {
SOEFileWeb_test();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed http_test_soe\r\n");
}
else if (cmd.startsWith("qvvr")) {
qvvr_test();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed http_test_qvvr\r\n");
}
else if (cmd.startsWith("connect")) {
comflag_test();
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed http_test_connect\r\n");
}
else if (cmd.startsWith("ledger")) {
QStringList parts = cmd.split(" ");
if (parts.size() > 1) {
QString terminalId = parts[1];
ledger(terminalId.toStdString().c_str(), clientSocket);
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed ledger with terminal_id\r\n");
} else {
ledger(NULL, clientSocket);
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed ledger without parameters\r\n");
}
}
else if (cmd.startsWith("value")) {
QStringList parts = cmd.split(" ");
if (parts.size() > 1) {
QString variableName = parts[1];
clientSocket->write("\r\x1B[K");
clientSocket->write("Executed value with variable name: " + variableName.toUtf8() + "\r\n");
value_print(variableName.toStdString().c_str(), clientSocket);
} else {
clientSocket->write("\r\x1B[K");
clientSocket->write("Please provide a variable name\r\n");
}
}
else if (cmd == "exit") {
clientSocket->write("\r\x1B[K");
clientSocket->write("Goodbye! Exiting shell...\r\n");
clientSocket->flush();
clientSocket->disconnectFromHost();
clientSocket->waitForDisconnected();
return;
}
else {
clientSocket->write("\r\x1B[K> ");
clientSocket->write("Unknown command\r\n");
clientSocket->flush();
}
// 命令处理结束后,打印提示符
printPrompt(clientSocket);
}
/**
* @brief 上箭头:历史命令回溯
*/
void handleUpArrow(QTcpSocket *clientSocket) {
if (!commandHistory.isEmpty() && historyIndex > 0) {
historyIndex--;
currentCommand = commandHistory[historyIndex];
// 清行:\r回到行首 + \x1B[K清除光标后文字
clientSocket->write("\r\x1B[K> ");
clientSocket->write(currentCommand.toUtf8());
clientSocket->flush();
}
}
/**
* @brief 下箭头:历史命令前进
*/
void handleDownArrow(QTcpSocket *clientSocket) {
if (!commandHistory.isEmpty() && historyIndex < commandHistory.size() - 1) {
historyIndex++;
currentCommand = commandHistory[historyIndex];
clientSocket->write("\r\x1B[K> ");
clientSocket->write(currentCommand.toUtf8());
clientSocket->flush();
}
else if (historyIndex == commandHistory.size() - 1) {
historyIndex = commandHistory.size();
currentCommand.clear();
clientSocket->write("\r\x1B[K> ");
clientSocket->flush();
}
}
private:
QTcpServer *server;
QTimer *timer;
int TEST_NUM;
QMutex mutex;
// 历史命令相关
QList<QString> commandHistory; // 存储历史命令
int historyIndex; // 当前历史命令索引
QString currentCommand; // 当前正在输入的命令
// viewlog 相关
bool stopViewLog;
//ping相关
bool g_stopTelnetTest;
QTcpSocket* activeClient;
};
//lnk20241213
class mqconsumerThread : public QThread
{
protected:
void run();
};
class OnTimerThread : public QThread//定时线程
{
protected:
void run();
};
//WW 2023-08-22 end
/////////////////////////////////////////////////////////////////////////
extern "C" {
#endif
//lnk20250106添加台账结构
typedef struct terminal terminal;
typedef struct monitor monitor;
struct monitor // 监测点台账
{
char monitor_id[64];
char terminal_code[64];
char monitor_name[64];
char logical_device_seq[64];
char voltage_level[64];
char terminal_connect[64];
char timestamp[64];
char status[255];
};
struct terminal // 终端台账
{
char terminal_id[64];
char terminal_code[64];
char org_name[64];
char maint_name[64];
char station_name[64];
char tmnl_factory[64];
char tmnl_status[64];
char dev_type[64];
char dev_key[255];
char dev_series[255];
char processNo[64]; //lnk20250210进程号
char addr_str[64];
char port[64];
char timestamp[64];
monitor line[10]; // 最多 10 个监测点
};
#ifdef __cplusplus
}
#endif
/////////////////////////////////////////////////////////////////////////////
#endif //SAVE2DB_8ue3hy0923r_H