Files
microser/json/save2json.h
2025-01-16 16:17:01 +08:00

345 lines
9.5 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: save2json.h,v $
* @brief: $IEC 61850 Protocol
*
* @version: $Revision: 1.5 $
* @date: $Date: 2018/12/23 12:39:52 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: save2json.h,v 1.5 2018/12/23 12:39:52 lizhongming Exp $
*
*/
#ifndef SAVE2DB_8ue3hy0923r_H
#define SAVE2DB_8ue3hy0923r_H
#ifdef __cplusplus
extern "C" {
#endif
extern int g_front_seg_index;
#ifdef __cplusplus
}
#endif
#ifdef __cplusplus
#include <string>
#include <vector>
#include <map>
#include <list>
#include <QThread>
//lnk20250106
#include "../include/rocketmq/SimpleProducer.h"
#include <QTcpServer>
#include <QTcpSocket>
#include <QMutex>
#include <QMutexLocker>
#include <QDebug>
#include <QTimer>
#include <QCoreApplication>
#include <QStringList>
#include <iostream>
extern int G_TEST_NUM;
extern void ledger(const char* terminal_id = NULL,QIODevice* outputDevice = NULL);
extern int TEST_PORT;
//////////////////////////////////////////////////////////////////////////////
//struct json_pair_info
//{
// string topic; //<Topic name="HISDATA" desc="历史数据"> "RTDATA" 实时数据 "RTDATASOE"实时SOE事件 等
// string data_type; //数据类型01/04:稳态/召测稳态02/05:闪变/召测闪变03/06:暂态/召测暂态
// string item; //<Item name="V" desc="电压" type="4" > 或"I" "PQ" 等
// string sequence; //"A" 或 "B" 或 "C" 或 "T"
// string name; //json的key
// int type; //6-值索引、9-实时SOE事件
// string mms_ref; //mms地址字符串
// float coeff; //Coefficient:数据系数
// unsigned short PltFlag; //0xffff
//};
/////////////////////////////////////////////////////////////////////////////
class KafkaSendThread : public QThread
{
// Q_OBJECT
//public:
protected:
void run();
};
//WW 2023-08-22 增加数据库线程和WebSocket线程
class SQLExcuteThread : public QThread
{
protected:
void run();
};
class WebSocketThread : public QThread
{
protected:
void run();
};
//lnk20241029
class WebhttpThread : public QThread
{
protected:
void run();
};
class httpThread : public QThread
{
protected:
void run();
};
//lnk20241202
/*class mqtestThread : public QThread
{
protected:
void run();
};*/
//lnk20250106
//使用telnet 127.0.0.1 12345进入测试
class Worker : public QObject
{
Q_OBJECT
public:
Worker(QObject *parent = NULL) : QObject(parent), server(NULL), TEST_NUM(G_TEST_NUM) {}
~Worker() {
if (server) {
server->close();
delete server;
}
}
public slots:
void startServer() {
server = new QTcpServer();
connect(server, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
if (!server->listen(QHostAddress::Any, TEST_PORT)) {
std::cout << "Server failed to start!" << std::endl;
qDebug() << "Server failed to start!";
delete server;
server = NULL;
emit serverError();
return;
} else {
std::cout << "Server is running on port " << TEST_PORT << std::endl;
qDebug() << QString("Server is running on port %1").arg(TEST_PORT);
}
// 创建并启动定时器
timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(doPeriodicTask()));
timer->start(60000); // 每60秒触发一次
std::cout << "Timer started, event loop running in thread: " << QThread::currentThreadId() << std::endl;
qDebug() << "Timer started, event loop running in thread:" << QThread::currentThreadId();
}
void setTestNum(int num) {
QMutexLocker locker(&mutex);
TEST_NUM = num;
}
private slots:
void doPeriodicTask() {
QMutexLocker locker(&mutex);
std::cout << "Executing TEST_NUM is " << TEST_NUM << std::endl;
qDebug() << "doPeriodicTask() called. TEST_NUM = " << TEST_NUM;
if (TEST_NUM != 0) {
qDebug() << "Executing rocketmq_test_300()";
std::cout << "Executing rocketmq_test_300()\n";
rocketmq_test_300(TEST_NUM, g_front_seg_index);
}
}
void onNewConnection() {
if (!server) return;
QTcpSocket *clientSocket = server->nextPendingConnection();
qDebug() << "New connection established!";
std::cout << "New connection established!\n";
connect(clientSocket, SIGNAL(readyRead()), this, SLOT(onReadyRead()));
// 向客户端发送提示符
if (clientSocket) {
std::cout << "clientSocket OK\n";
clientSocket->write("Welcome to the test shell. Type 'help' for available commands.\n> ");
clientSocket->flush(); // 确保消息立即发送
}
}
void onReadyRead() {
QTcpSocket *clientSocket = qobject_cast<QTcpSocket *>(sender());
std::cout << "onReadyRead\n";
if (!clientSocket) {
std::cout << "Invalid socket\n";
return;
}
std::cout << "read all\n";
QByteArray data = clientSocket->readAll();
QString command = QString::fromUtf8(data).trimmed(); // 获取输入的命令并去除前后空格
qDebug() << "Received command:" << command;
std::cout << "Received command: " << command.toStdString() << "\n";
// 向客户端发送“指令已输入”反馈
clientSocket->write("Received command\n> ");
clientSocket->flush();
// 重新显示提示符
clientSocket->write("test_shell> ");
clientSocket->flush(); // 确保提示符立即显示
// 处理 TEST_NUM
if (command.startsWith("TEST_NUM=")) {
bool ok;
int num = command.mid(9).toInt(&ok); // 获取等号后面的数字部分
if (ok) {
setTestNum(num); // 更新 TEST_NUM
clientSocket->write("TEST_NUM updated\n> ");
std::cout << "TEST_NUM updated\n";
} else {
clientSocket->write("Invalid number\n> ");
std::cout << "Invalid number\n";
}
}
// 处理 rc 命令
else if (command.startsWith("rc")) {
qDebug() << "Executing rocketmq_test_rc()";
std::cout << "Executing rocketmq_test_rc()\n";
rocketmq_test_rc(); // 调用 rc 函数
clientSocket->write("Executed rocketmq_test_rc\n> ");
}
// 处理 rt 命令
else if (command.startsWith("rt")) {
qDebug() << "Executing rocketmq_test_rt()";
std::cout << "Executing rocketmq_test_rt()\n";
rocketmq_test_rt(); // 调用 rt 函数
clientSocket->write("Executed rocketmq_test_rt\n> ");
}
// 处理 ud 命令
else if (command.startsWith("ud")) {
qDebug() << "Executing rocketmq_test_ud()";
std::cout << "Executing rocketmq_test_ud()\n";
rocketmq_test_ud(); // 调用 ud 函数
clientSocket->write("Executed rocketmq_test_ud\n> ");
}
else if (command.startsWith("set")) {
qDebug() << "Executing rocketmq_test_set()";
std::cout << "Executing rocketmq_test_set()\n";
rocketmq_test_set(); // 调用 set 函数
clientSocket->write("Executed rocketmq_test_set\n> ");
}
else if (command.startsWith("ledger")) {
qDebug() << "Executing ledger()";
std::cout << "Executing ledger()\n";
// 提取参数
QStringList parts = command.split(" "); // 根据空格分割命令
if (parts.size() > 1) { // 如果命令中包含参数(即 id号
QString terminalId = parts[1];
std::cout << "Calling ledger with terminal_id: " << terminalId.toStdString() << std::endl;
// 修改:调用 ledger 时传递 clientSocket 作为输出设备
ledger(terminalId.toStdString().c_str(), clientSocket); // 调用带参数的 ledger传递 clientSocket
clientSocket->write("Executed ledger with terminal_id\n> ");
} else {
std::cout << "Calling ledger without parameters\n";
// 修改:调用无参数的 ledger传递 clientSocket
ledger(NULL, clientSocket); // 调用无参数的 ledger传递 clientSocket
clientSocket->write("Executed ledger without parameters\n> ");
}
}
// 处理未知命令
else {
clientSocket->write("Unknown command\n> ");
}
clientSocket->flush();
}
signals:
void serverError();
private:
QTcpServer *server;
QTimer *timer;
int TEST_NUM;
QMutex mutex;
};
//lnk20241213
class mqconsumerThread : public QThread
{
protected:
void run();
};
class OnTimerThread : public QThread//定时线程
{
protected:
void run();
};
//WW 2023-08-22 end
/////////////////////////////////////////////////////////////////////////
extern "C" {
#endif
//lnk20250106添加台账结构
typedef struct terminal terminal;
typedef struct monitor monitor;
struct monitor // 监测点台账
{
char monitor_id[64];
char terminal_code[64];
char monitor_name[64];
char logical_device_seq[64];
char voltage_level[64];
char terminal_connect[64];
char timestamp[64];
char status[255];
};
struct terminal // 终端台账
{
char terminal_id[64];
char terminal_code[64];
char org_name[64];
char maint_name[64];
char station_name[64];
char tmnl_factory[64];
char tmnl_status[64];
char dev_type[64];
char dev_key[255];
char dev_series[255];
char addr_str[64];
char port[64];
char timestamp[64];
monitor line[10]; // 最多 10 个监测点
};
#ifdef __cplusplus
}
#endif
/////////////////////////////////////////////////////////////////////////////
#endif //SAVE2DB_8ue3hy0923r_H