Files
microser/json/save2json.cpp

2618 lines
80 KiB
C++
Raw Permalink Normal View History

2025-01-16 16:17:01 +08:00
/**
* @file: $RCSfile: save2json.cpp,v $
* @brief: $IEC 61850 Protocol
*
* @version: $Revision: 1.21 $
* @date: $Date: 2020/10/27 08:51:07 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: save2json.cpp,v 1.21 2020/10/27 08:51:07 lizhongming Exp $
*
*/
using namespace std;
#include <iostream>
#include <stdio.h>
#include <assert.h>
#include <fstream> // std::filebuf
#include <string.h>
#include <sstream>
#include "qdebug.h"
#include <QSettings>
#include <QDateTime>
#include <QDir>
2025-05-09 16:53:07 +08:00
#include <QMap>//CZY 2023-08-17 WW 2023年3月13日17:21:02 增加多ICD支持
2025-01-16 16:17:01 +08:00
#include <apr_uuid.h>
#include <apr_strings.h>
2025-05-09 16:53:07 +08:00
#include "../log4cplus/log4.h"//lnk添加log4
2025-01-16 16:17:01 +08:00
#include "../mms/db_interface.h"
#include "../json/save2json.h"
#include "../json/mms_json_inter.h"
#include "kafka_producer.h"
2025-05-09 16:53:07 +08:00
#include "../rocketmq/CPushConsumer.h"
2025-01-16 16:17:01 +08:00
#include <vector>
2025-05-09 16:53:07 +08:00
#include "../json/cjson.h" //解json
#include <sstream> //创建xml
#include <fstream> //创建xml
2025-01-16 16:17:01 +08:00
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type);
extern int recall_json_handle(const char* jstr);
extern std::string intToString(int number);
int StringToInt(const std::string& str);
extern pthread_mutex_t mtx;//lnk20250115
2025-01-16 19:16:26 +08:00
2025-01-16 16:17:01 +08:00
#ifdef __cplusplus
extern "C" {
2025-05-09 16:53:07 +08:00
//解决编译lnk20250509
#define thisFileName __FILE__
2025-01-16 16:17:01 +08:00
#include "../mms/rdb_client.h"
#include "node.h"//lnk20241223
#include "mvl_defs.h"
#include "mms_vvar.h"
#endif /* __cplusplus */
extern unsigned int g_node_id;
extern int g_front_seg_index;
extern char subdir[128];
extern int comtrade_remain_file_num;
extern node_t* g_node; //lnk20241223
extern LD_info_t* find_LD_info_only_from_mp_id(char* mp_id);//lnk20241223
2025-01-16 19:16:26 +08:00
extern void print_terminal(const terminal* tmnl);
2025-01-16 16:17:01 +08:00
#ifdef __cplusplus
}
#endif
#ifndef nullptr
#define nullptr NULL
#endif
2025-05-20 16:31:12 +08:00
extern uint32_t g_mqproducer_blocked_times;
2025-03-04 17:29:04 +08:00
extern int INITFLAG;
2025-05-12 16:43:42 +08:00
extern std::string FRONT_INST;
2025-01-16 16:17:01 +08:00
extern QMutex kafka_data_list_mutex;
extern QList<Ckafka_data_t> kafka_data_list;
extern QMutex oss_data_list_mutex;
extern QList<oss_data_t> oss_data_list;
extern int FILE_FLAG;
KafkaSendThread myThrd;
2025-05-09 16:53:07 +08:00
//WW 2023-08-22 增加数据库线程和WebSokcet线程
WebSocketThread socketThrd; //Web Socket线程类对象
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
WebhttpThread webhttpThrd; //Web http线程类对象 lnk202411
httpThread httpThrd; //Web http线程类对象 lnk202411
mqconsumerThread mqconsumerThrd;//mq消费者线程lnk20241213
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
OnTimerThread onTimerThrd;//定时线程
2025-04-30 10:22:57 +08:00
2025-01-16 16:17:01 +08:00
extern int FILE_FLAG;
extern int SEND_FLAG;
extern char* BROKER_LIST;
extern char* TOPIC_STAT;
extern char* TOPIC_PST;
extern char* TOPIC_PLT;
extern char* TOPIC_EVENT;
extern char* TOPIC_ALARM;
extern char* TOPIC_SNG;
extern char* TOPIC_RTDATA;//lnk20241220
extern char* UDS_UPLOAD_URL;
2025-05-09 16:53:07 +08:00
extern char g_onlyIP[255]; //直连某个IP仅仅为方便测试
2025-01-16 16:17:01 +08:00
//WW 2023-08-22 end
2025-05-09 16:53:07 +08:00
//lnk20241216添加mq消费者
2025-01-16 16:17:01 +08:00
extern std::string G_MQCONSUMER_IPPORT;//rocketmq ip+port
extern std::string G_MQCONSUMER_TOPIC_RT;//topie_realtimedata
extern std::string G_MQCONSUMER_TAG_RT;//tag
extern std::string G_MQCONSUMER_KEY_RT;//key
extern std::string G_MQCONSUMER_TOPIC_UD;//topie_update
extern std::string G_MQCONSUMER_TAG_UD;//tag
extern std::string G_MQCONSUMER_KEY_UD;//key
extern std::string G_MQCONSUMER_TOPIC_RC;//topie_recall
extern std::string G_MQCONSUMER_TAG_RC;//tag
extern std::string G_MQCONSUMER_KEY_RC;//key
extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall
extern std::string G_MQCONSUMER_TAG_SET;//tag
extern std::string G_MQCONSUMER_KEY_SET;//key
2025-02-26 16:39:10 +08:00
extern std::string G_MQCONSUMER_TOPIC_LOG;//topie_log
extern std::string G_MQCONSUMER_TAG_LOG;//tag
extern std::string G_MQCONSUMER_KEY_LOG;//key
extern std::string G_LOG_TOPIC;//topie
extern std::string G_LOG_TAG;//tag
extern std::string G_LOG_KEY;//key
bool showinshellflag =false;
2025-01-16 16:17:01 +08:00
#define APRTIME_8H (28800000000ULL)
#define APRTIME_1H (3600000000ULL)
///////////////////////////////////////////////////////////////////////////////
const int MAX_LIST_SIZE = 16;
2025-06-24 17:15:18 +08:00
//static QMap<int, QList<long long> > real_data_report_map;
static QMap<int, QMap<int, QList<long long>>> real_data_report_map; //多个监测点的多个实时报告的时间列表lnk20250624
2025-05-09 16:53:07 +08:00
static QMap<QString, json_block_data*> json_data_map;//CZY 2023-08-17 ww 2023年3月13日17:23:17扩展Map用于保存各条线路的数据
static QMap<QString, json_block_data*> json_flicker_data_map;//CZY 2023-09-11 展Map用于保存各条线路的闪变数据
static QMap<QString, json_block_data*> json_pst_data_map;//CZY 2023-09-11 展Map用于保存各条线路的闪变数据
bool is_blank(const std::string& str)
{
for (std::string::const_iterator it = str.begin(); it != str.end(); ++it)
{
if (!std::isspace(*it)) {
return false;
}
}
return true;
}
///////////////////////////////////////////////////////////////////////////////////
2025-01-16 16:17:01 +08:00
2025-06-24 17:15:18 +08:00
int urcbRealDataHasReceived(int dev_index, int rptNo, LD_info_t* LD_info, long long Time) //增加报告入参lnk20250624
2025-01-16 16:17:01 +08:00
{
2025-06-24 17:15:18 +08:00
QList<long long>& ts_list = real_data_report_map[LD_info->line_id][rptNo];
2025-05-09 16:53:07 +08:00
bool bFind = ts_list.contains(Time); //实时数据时间链表
2025-01-16 16:17:01 +08:00
if (bFind == false) {
ts_list.append(Time);
if (ts_list.size() > MAX_LIST_SIZE)
ts_list.removeFirst();
2025-05-09 16:53:07 +08:00
//lnk20241223每收到一次实时数据就检查一下数量
2025-01-16 16:17:01 +08:00
int real_report_count = 0;
2025-06-24 17:15:18 +08:00
//real_report_count = get_real_report_count(LD_info);
real_report_count = LD_info->rptinfo[rptNo]->count;//lnk20250624
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//调试
2025-01-16 16:17:01 +08:00
std::cout << "real_report_count is" << real_report_count << std::endl;
std::cout << "mp limit is" << LD_info->limit << std::endl;
if(real_report_count >= LD_info->limit){
std::cout << "real_report_count reach limit!!!"<< std::endl;
2025-05-09 16:53:07 +08:00
//生成delete.xml
2025-01-16 16:17:01 +08:00
if (!createXmlFile(dev_index, LD_info->line_id, 0, 0, 0,"delete")) {
std::cerr << "Failed to create delete XML file!!!." << std::endl;
}
}
2025-05-09 16:53:07 +08:00
return 0; //没有重复数据
2025-01-16 16:17:01 +08:00
}
else
2025-05-09 16:53:07 +08:00
return 1; //有重复数据
2025-01-16 16:17:01 +08:00
}
//////////////////////////////////////////////////////////////////////////
void add_comm_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
if (g_node_id == STAT_DATA_BASE_NODE_ID)
com_log_fn += "comm_100_stat.txt";
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID)
com_log_fn += "comm_200_3s.txt";
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID)
com_log_fn += "comm_300_comtrade.txt";
else if (g_node_id == HIS_DATA_BASE_NODE_ID)
com_log_fn += "comm_400_his.txt";
else if (g_node_id == NEW_HIS_DATA_BASE_NODE_ID) {
com_log_fn.append(QString("comm_400_his_%1.txt").arg(g_front_seg_index));
}
else if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID)
com_log_fn += "comm_600_recall.txt";
else if (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {
com_log_fn.append(QString("comm_700_allrecall_%1.txt").arg(g_front_seg_index));
}
else
com_log_fn += "comm_x00_unknown.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
void add_sng_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
com_log_fn += "sng_kafka_json.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
void add_stat_kafka_json_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
com_log_fn += "stat_kafka_json.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
//////////////////////////////////////////////////////////////////////////
2025-05-09 16:53:07 +08:00
/*新增rocketmq发送数据lnk10-10*/
2025-01-16 16:17:01 +08:00
void my_rocketmq_send(Ckafka_data_t& data)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Rt_tp;
static bool init = false;
if (!init) {
2025-01-16 16:17:01 +08:00
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Rt_tp = TOPIC_RTDATA;
init = true;
2025-01-16 16:17:01 +08:00
}
std::string key = data.mp_id.toStdString();
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else if (data.strTopic == "RTDATA")//lnk20241220
{
topic = cfg_Rt_tp;
}
else
{
topic = data.strTopic.toStdString();
2025-02-27 16:28:04 +08:00
2025-01-16 16:17:01 +08:00
}
if (g_onlyIP[0] != 0)
{
2025-05-09 16:53:07 +08:00
//单例模式
2025-01-16 16:17:01 +08:00
add_sng_log(data.strText.toAscii().data());
}
2025-06-23 10:02:56 +08:00
//rocketmq_producer_send(const_cast<char*>(senddata.c_str()),const_cast<char*>(topic.c_str()));
rocketmq_producer_send(senddata.c_str(), topic.c_str());//lnk20250623修复偶发性doublefree
2025-01-16 16:17:01 +08:00
}
void my_kafka_send(Ckafka_data_t& data)
{
#ifdef __GNUC__
static FeKafkaProducer kafkaProducer;
#endif
int retsize = -1;
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Sng_tp;
static bool init = false;
if (!init) {
2025-01-16 16:17:01 +08:00
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Sng_tp = TOPIC_SNG;
cout << cfg_His_tp << endl;
2025-04-29 15:05:36 +08:00
std::string brokerlist = BROKER_LIST;
2025-01-16 16:17:01 +08:00
#ifdef __GNUC__
if (kafkaProducer.init(brokerlist)) {
printf("kafka producer init success(%s)\n", brokerlist.c_str());
/*bool ret = kafkaProducer.create_topic(topic);
if(ret)
printf("create topic OK \n");
else
printf("create topic Failed \n");*/
}
else
printf("kafka producer init Failed(%s)\n", brokerlist.c_str());
#endif
init = true;
2025-01-16 16:17:01 +08:00
}
char tmp_str[256];
apr_snprintf(tmp_str, sizeof(tmp_str), "%d", data.monitor_id);
std::string key = std::string(tmp_str);
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
add_stat_kafka_json_log(data.strText.toAscii().data());
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
add_sng_log(data.strText.toAscii().data());
}
#ifdef __GNUC__
retsize = kafkaProducer.send(senddata.c_str(), senddata.length(), topic, RdKafka::Topic::PARTITION_UA, &key);
#endif
if (retsize > 0) {
printf("\nkafka send, monitor_id:[%s] topic:[%s] Success,return length %d\n", key.c_str(), topic.c_str(), retsize);
}
else
printf("\nFailed kafka send, monitor_id:[%s] topic:[%s]\n", key.c_str(), topic.c_str());
}
void my_datahub_send(Ckafka_data_t& data)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static bool init = false;
if (!init) {
2025-01-16 16:17:01 +08:00
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
init = true;
2025-01-16 16:17:01 +08:00
}
std::string key = data.mp_id.toStdString();
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
2025-05-09 16:53:07 +08:00
//单例模式
2025-01-16 16:17:01 +08:00
add_sng_log(data.strText.toAscii().data());
}
DataHub_Send_Datahub(const_cast<char*>(topic.c_str()), const_cast<char*>(senddata.c_str()));
printf("\ndatahub send, monitor_id:[%s] topic:[%s] Success\n", key.c_str(), topic.c_str());
}
void concatenate_and_separate(char str1[], char str2[], QString* result) {
QString qstr1 = QString(str1);
QString qstr2 = QString(str2);
*result = qstr1 + "-" + qstr2;
}
void KafkaSendThread::run()
{
2025-05-09 16:53:07 +08:00
//线程开始创建生产者lnk20241211
2025-01-16 16:17:01 +08:00
InitializeProducer();
printf("\nKafkaSendThread::run() is called ...... \n\n");
while (1) {
Ckafka_data_t data;
bool data_gotten;
data_gotten = false;
kafka_data_list_mutex.lock();
if (!kafka_data_list.isEmpty()) {
data_gotten = true;
data = kafka_data_list.takeFirst();
}
kafka_data_list_mutex.unlock();
if (data_gotten) {
static uint32_t count = 0;
printf("BEGIN my_kafka_send no.%i -------->>>>>>>>>>>> %s \n", count,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
2025-05-09 16:53:07 +08:00
if (SEND_FLAG == 1) //kafka推送
2025-01-16 16:17:01 +08:00
{
my_kafka_send(data);
}
2025-05-09 16:53:07 +08:00
else if (SEND_FLAG == 2)//datahub推送
2025-01-16 16:17:01 +08:00
{
my_datahub_send(data);
}
2025-05-09 16:53:07 +08:00
else if (SEND_FLAG == 3)//rocketmq推送lnk10-11
2025-01-16 16:17:01 +08:00
{
my_rocketmq_send(data);
}
2025-05-09 16:53:07 +08:00
else //未配置 默认mq推送
2025-01-16 16:17:01 +08:00
{
my_rocketmq_send(data);
}
printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
2025-05-20 16:31:12 +08:00
//清空计数器
g_mqproducer_blocked_times =0;
2025-05-09 16:53:07 +08:00
QThread::msleep(10); // 避免 CPU 空转lnk20250326
2025-04-29 15:05:36 +08:00
}
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//线程结束摧毁生产者
2025-01-16 16:17:01 +08:00
ShutdownAndDestroyProducer();//lnk20241211
}
2025-05-09 16:53:07 +08:00
//lnk20241213补招部分///////////////////////////////////////////////////////////////////////////////////////////////
// 提取 'data' 数组并返回为新的 JSON 字符串 (返回 std::string)
2025-01-16 16:17:01 +08:00
std::string extractDataJson(const char* inputJson) {
2025-05-09 16:53:07 +08:00
// 解析输入 JSON 字符串
2025-01-16 16:17:01 +08:00
cJSON* root = cJSON_Parse(inputJson);
if (root == NULL) {
std::cerr << "Error parsing JSON" << std::endl;
return "";
}
2025-05-09 16:53:07 +08:00
// 提取 "messageBody" 部分
2025-02-14 16:44:38 +08:00
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
2025-02-08 17:04:39 +08:00
cJSON_Delete(root);
2025-02-18 17:10:22 +08:00
return "";
2025-02-14 16:44:38 +08:00
}
2025-05-09 16:53:07 +08:00
// 解析 messageBody 中的 JSON 字符串
2025-02-17 16:58:14 +08:00
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
2025-02-18 17:10:22 +08:00
return "";
2025-02-17 16:58:14 +08:00
}
2025-05-09 16:53:07 +08:00
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
2025-02-14 16:44:38 +08:00
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
2025-02-18 17:10:22 +08:00
return "";
2025-02-08 17:04:39 +08:00
}
2025-05-12 16:43:42 +08:00
//添加guid
2025-05-09 16:53:07 +08:00
// 提取 "guid" 部分
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
2025-05-12 16:43:42 +08:00
if (guidstr == NULL || guidstr->type != cJSON_String) {
2025-05-09 16:53:07 +08:00
std::cerr << "'guid' is missing or is not an array" << std::endl;
cJSON_Delete(root);
return "";
}
2025-05-12 16:43:42 +08:00
//guid回复
std::string guid = guidstr->valuestring;
send_reply_to_kafka(guid,"1","收到补招指令");
2025-05-09 16:53:07 +08:00
// 提取 "data" 部分
2025-02-14 16:44:38 +08:00
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
2025-01-16 16:17:01 +08:00
if (data == NULL || data->type != cJSON_Array) {
std::cerr << "'data' is missing or is not an array" << std::endl;
cJSON_Delete(root);
return "";
}
2025-05-09 16:53:07 +08:00
// 创建新的 JSON 数组对象,只包含 "data" 部分
cJSON* newJson = cJSON_CreateArray(); // 创建一个新的数组
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 将 "data" 数组中的元素逐个添加到新数组中
2025-01-16 16:17:01 +08:00
cJSON* dataItem = NULL;
cJSON_ArrayForEach(dataItem, data) {
cJSON_AddItemToArray(newJson, cJSON_Duplicate(dataItem, 1));
}
2025-05-09 16:53:07 +08:00
// 将新的 JSON 数组转换为字符串
2025-01-16 16:17:01 +08:00
char* newJsonString = cJSON_Print(newJson);
if (newJsonString == NULL) {
std::cerr << "Error printing new JSON" << std::endl;
cJSON_Delete(root);
cJSON_Delete(newJson);
return "";
}
2025-05-09 16:53:07 +08:00
// 转换为 std::string 类型
2025-01-16 16:17:01 +08:00
std::string result(newJsonString);
2025-05-09 16:53:07 +08:00
// 清理内存
2025-01-16 16:17:01 +08:00
free(newJsonString);
cJSON_Delete(root);
cJSON_Delete(newJson);
2025-05-09 16:53:07 +08:00
return result; // 返回 std::string 类型的结果
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
//实时数据部分//////////////////////////////////////////////////////////////////////////////////////////////////////////
// 提取 JSON 消息中的相关字段
2025-01-16 16:17:01 +08:00
bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::string& line, bool& realData, bool& soeData, int& limit)
{
2025-05-09 16:53:07 +08:00
// 解析 JSON 数据
2025-01-16 16:17:01 +08:00
cJSON* root = cJSON_Parse(body.c_str());
if (root == NULL) {
std::cerr << "Failed to parse JSON message." << std::endl;
return false;
}
2025-05-09 16:53:07 +08:00
// 提取 "messageBody" 部分
2025-02-14 16:44:38 +08:00
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
2025-02-08 17:04:39 +08:00
cJSON_Delete(root);
2025-02-14 16:44:38 +08:00
return false;
}
2025-05-09 16:53:07 +08:00
// 解析 messageBody 中的 JSON 字符串
2025-02-17 16:58:14 +08:00
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return false;
}
2025-05-09 16:53:07 +08:00
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
2025-02-14 16:44:38 +08:00
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return false;
2025-02-08 17:04:39 +08:00
}
2025-05-09 16:53:07 +08:00
// 提取字段
2025-02-14 16:44:38 +08:00
cJSON* devSeriesItem = cJSON_GetObjectItem(messageBody, "devSeries");
cJSON* lineItem = cJSON_GetObjectItem(messageBody, "line");
cJSON* realDataItem = cJSON_GetObjectItem(messageBody, "realData");
cJSON* soeDataItem = cJSON_GetObjectItem(messageBody, "soeData");
cJSON* limitItem = cJSON_GetObjectItem(messageBody, "limit");
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//添加guid
2025-05-12 16:43:42 +08:00
std::string guid;
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if(guidstr)guid = guidstr->valuestring;
2025-05-09 16:53:07 +08:00
2025-01-16 16:17:01 +08:00
if (devSeriesItem && lineItem && realDataItem && soeDataItem && limitItem) {
devSeries = devSeriesItem->valuestring;
line = lineItem->valuestring;
realData = realDataItem->valueint;
soeData = soeDataItem->valueint;
limit = limitItem->valueint;
2025-05-09 16:53:07 +08:00
//回复消息
2025-05-12 16:43:42 +08:00
//执行结果直接看实时数据不需要再回复1是收到消息
send_reply_to_kafka(guid,"1","收到三秒数据指令");
2025-05-09 16:53:07 +08:00
2025-01-16 16:17:01 +08:00
} else {
std::cerr << "Missing expected fields in JSON message." << std::endl;
cJSON_Delete(root);
return false;
}
2025-05-09 16:53:07 +08:00
cJSON_Delete(root); // 清理 JSON 对象
2025-01-16 16:17:01 +08:00
return true;
}
2025-05-09 16:53:07 +08:00
// 构造 XML 内容的函数新建和删除
2025-01-16 16:17:01 +08:00
std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <New>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"" << (realData ? "true" : "false") << "\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"" << limit << "\" "
<< "Count=\"0\" "
<< "SOEData=\"" << (soeData ? "true" : "false") << "\"/>\n"
<< " </New>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
std::string createdeleteXmlContent(int devindex, int mpindex)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <Delete>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"false\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"0\" "
<< "Count=\"0\" "
<< "SOEData=\"false\"/>\n"
<< " </Delete>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
2025-05-09 16:53:07 +08:00
// 写入 XML 内容到文件的函数
2025-01-16 16:17:01 +08:00
bool writeToFile(const std::string& filePath, const std::string& xmlContent)
{
2025-05-09 16:53:07 +08:00
// 打开文件流以写入 XML 内容
std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char*
2025-01-16 16:17:01 +08:00
if (outFile.is_open()) {
2025-05-09 16:53:07 +08:00
outFile << xmlContent; // 写入内容
2025-01-16 16:17:01 +08:00
outFile.close();
std::cout << "XML file created at: " << filePath << std::endl;
return true;
} else {
std::cerr << "Failed to open file for writing: " << filePath << std::endl;
return false;
}
}
2025-05-09 16:53:07 +08:00
// 创建并写入新的 XML 文件的主函数
2025-01-16 16:17:01 +08:00
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type)
{
std::string xmlContent = "";
std::string directory = "";
std::string filePath = "";
if(type == "new"){
2025-05-09 16:53:07 +08:00
// 构造 XML 内容
2025-01-16 16:17:01 +08:00
xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit);
2025-05-09 16:53:07 +08:00
// 设置文件路径
2025-01-16 16:17:01 +08:00
directory = "../etc/trigger3s/";
filePath = directory + "newtrigger.xml";
}
else if(type == "delete"){
2025-05-09 16:53:07 +08:00
// 构造 XML 内容
2025-01-16 16:17:01 +08:00
xmlContent = createdeleteXmlContent(devindex, mpindex);
2025-05-09 16:53:07 +08:00
// 设置文件路径
2025-01-16 16:17:01 +08:00
directory = "../etc/trigger3s/";
filePath = directory + "deletetrigger.xml";
}
else{
std::cerr << "Failed to create xmlfile,type error: " << std::endl;
return false;
}
2025-05-09 16:53:07 +08:00
// 创建目录(如果不存在)
2025-01-16 16:17:01 +08:00
if (system(("mkdir -p " + directory).c_str()) != 0) {
std::cerr << "Failed to create directory: " << directory << std::endl;
return false;
}
2025-05-09 16:53:07 +08:00
// 将 XML 内容写入文件
2025-01-16 16:17:01 +08:00
return writeToFile(filePath, xmlContent);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
2025-05-09 16:53:07 +08:00
//lnk20250108进程更新部分
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 用于关闭进程监听的端口
extern int server_socket; //Web Socket服务端实例
2025-01-16 16:17:01 +08:00
void close_listening_socket() {
if (server_socket != -1) {
2025-05-09 16:53:07 +08:00
// 关闭socket
2025-01-16 16:17:01 +08:00
close(server_socket);
std::cout << "Server socket closed successfully!" << std::endl;
2025-05-09 16:53:07 +08:00
server_socket = -1; // 重置 server_socket
2025-01-16 16:17:01 +08:00
} else {
std::cout << "No server socket to close!" << std::endl;
}
}
2025-05-09 16:53:07 +08:00
//用于校验ip格式
2025-02-08 17:04:39 +08:00
bool isValidIP(const std::string &ip) {
std::vector<std::string> parts;
std::stringstream ss(ip);
std::string part;
2025-05-09 16:53:07 +08:00
// 使用 "." 作为分隔符将 IP 地址分割成各部分
2025-02-08 17:04:39 +08:00
while (getline(ss, part, '.')) {
parts.push_back(part);
}
2025-05-09 16:53:07 +08:00
// IP 地址必须有 4 部分
2025-02-08 17:04:39 +08:00
if (parts.size() != 4) {
return false;
}
2025-05-09 16:53:07 +08:00
// 校验每一部分是否为合法的数字且在 0 到 255 之间
2025-02-08 17:04:39 +08:00
for (size_t i = 0; i < parts.size(); ++i) {
2025-05-09 16:53:07 +08:00
// 校验每部分是否为数字
2025-02-08 17:04:39 +08:00
for (size_t j = 0; j < parts[i].size(); ++j) {
if (!isdigit(parts[i][j])) {
return false;
}
}
2025-05-09 16:53:07 +08:00
// 转换为整数并检查是否在有效范围内
2025-02-08 17:04:39 +08:00
int num = atoi(parts[i].c_str());
if (num < 0 || num > 255) {
return false;
}
2025-05-09 16:53:07 +08:00
// 检查是否有前导零(如 01、001 等)
2025-02-08 17:04:39 +08:00
if (parts[i].length() > 1 && parts[i][0] == '0') {
return false;
}
}
return true;
}
2025-05-09 16:53:07 +08:00
//执行脚本控制进程
2025-01-16 16:17:01 +08:00
void execute_bash(string fun,int process_num,string type)
{
2025-05-09 16:53:07 +08:00
// 为 char 数组分配足够的空间
2025-01-16 16:17:01 +08:00
char p_num_str[20];
2025-05-09 16:53:07 +08:00
// 使用 sprintf 转换
2025-01-16 16:17:01 +08:00
std::sprintf(p_num_str, "%d", process_num);
2025-05-09 16:53:07 +08:00
const char* script = "/FeProject/bin/set_process.sh";//使用setsid防止端口占用
2025-01-16 16:17:01 +08:00
const char* param1 = fun.c_str();
const char* param2 = p_num_str;
const char* param3 = type.c_str();
2025-05-09 16:53:07 +08:00
// 构造完整的命令
2025-01-16 16:17:01 +08:00
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3);
std::cout << "command:" << command <<std::endl;
2025-05-09 16:53:07 +08:00
// 执行命令
2025-01-16 16:17:01 +08:00
system(command);
}
2025-05-09 16:53:07 +08:00
//执行脚本控制进程
2025-02-14 16:44:38 +08:00
void execute_bash_debug(string fun,string ip,string type,int proindex)
2025-02-08 17:04:39 +08:00
{
2025-05-09 16:53:07 +08:00
const char* script = "/FeProject/bin/set_debug.sh";//使用setsid防止端口占用
2025-02-08 17:04:39 +08:00
const char* param1 = fun.c_str();
const char* param2 = ip.c_str();
const char* param3 = type.c_str();
2025-02-14 16:44:38 +08:00
2025-05-09 16:53:07 +08:00
// 将 proindex 转换为字符串
2025-02-14 16:44:38 +08:00
char param4[32];
snprintf(param4, sizeof(param4), "%d", proindex);
2025-02-08 17:04:39 +08:00
2025-05-09 16:53:07 +08:00
// 构造完整的命令
2025-02-08 17:04:39 +08:00
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s %s &", script, param1, param2, param3,param4);
2025-02-08 17:04:39 +08:00
std::cout << "command:" << command <<std::endl;
2025-05-09 16:53:07 +08:00
// 执行命令
2025-02-08 17:04:39 +08:00
system(command);
}
2025-05-30 15:40:20 +08:00
int parse_set(const std::string& json_str) {
2025-05-09 16:53:07 +08:00
// 解析 JSON 字符串
2025-01-16 16:17:01 +08:00
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
2025-05-30 15:40:20 +08:00
return 1;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 提取 "messageBody" 部分
2025-02-14 16:44:38 +08:00
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
2025-02-08 17:04:39 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-08 17:04:39 +08:00
}
2025-05-09 16:53:07 +08:00
// 解析 messageBody 中的 JSON 字符串
2025-02-17 16:58:14 +08:00
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-17 16:58:14 +08:00
}
2025-05-09 16:53:07 +08:00
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
2025-02-14 16:44:38 +08:00
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-14 16:44:38 +08:00
}
2025-05-09 16:53:07 +08:00
// 获取 guid 字段
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
// 根据 guid 字段回复消息
std::string guid = guidstr->valuestring;
// 获取 code 字段
2025-02-14 16:44:38 +08:00
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
2025-01-16 16:17:01 +08:00
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
cJSON* processNo = cJSON_GetObjectItem(messageBody, "processNo");
if (processNo == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
//判断是不是自己进程号:
int index_value = processNo->valueint;
cJSON* funtion = cJSON_GetObjectItem(messageBody, "fun");
if (funtion == nullptr) {
std::cout << "Missing 'fun' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string fun = funtion->valuestring;
cJSON* front = cJSON_GetObjectItem(messageBody, "frontType");
if (front == nullptr) {
std::cout << "Missing 'frontType' in JSON." << std::endl;
2025-01-16 16:17:01 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
std::string frontType = front->valuestring;
2025-01-16 16:17:01 +08:00
2025-05-14 16:42:29 +08:00
if (index_value != g_front_seg_index && g_front_seg_index != 0) {
2025-01-16 16:17:01 +08:00
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
//进程号为0或者进程号匹配上
2025-01-16 16:17:01 +08:00
std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl;
2025-05-30 15:40:20 +08:00
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_SET.c_str());
2025-01-16 16:17:01 +08:00
if (code_str == "set_process") {
2025-05-09 16:53:07 +08:00
cJSON* num = cJSON_GetObjectItem(messageBody, "processNum");
if (num == nullptr) {
std::cout << "Missing 'processNum' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
int processNum = num->valueint;
//校验数据
if((fun == "reset" || fun == "add") &&
(processNum >=1 && processNum < 10) &&
(frontType == "stat" || frontType == "recall" || frontType == "all")){
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
// 调用执行脚本函数
if(fun == "reset"){
close_listening_socket();
}
execute_bash(fun, processNum, frontType);
DIY_WARNLOG_CODE("process",LOG_CODE_PROCESS_CONTROL,"【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
2025-05-30 15:40:20 +08:00
2025-05-09 16:53:07 +08:00
//脚本在3秒后执行
//回复消息
2025-05-12 16:43:42 +08:00
send_reply_to_kafka(guid,"1","收到重置进程指令,重启所有进程!");
2025-05-09 16:53:07 +08:00
//上送日志
2025-01-16 16:17:01 +08:00
std::cout << "this msg should only execute once" <<std::endl;
2025-05-09 16:53:07 +08:00
}
else{
std::cout << "only cfg_stat_data index 1 can control process,this process not handle this msg" << std::endl;
}
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
else if(fun == "delete"){
//等待一会后退出进程
MVL_LOG_ACSE0("MYLOG: recive delete msg, so exit to restart ");
2025-05-12 16:43:42 +08:00
//回复消息
send_reply_to_kafka(guid,"1","收到删除进程指令,这个进程将会重启 ");
//上送日志
DIY_WARNLOG_CODE("process",LOG_CODE_PROCESS_CONTROL,"【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
2025-05-12 16:43:42 +08:00
2025-05-09 16:53:07 +08:00
apr_sleep(apr_time_from_sec(10));
2025-05-14 16:42:29 +08:00
::_exit(-1039); //进程退出
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
else{
std::cout << "param is not executable" <<std::endl;
}
2025-01-16 16:17:01 +08:00
}
2025-02-08 17:04:39 +08:00
else if (code_str == "set_debug"){
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
std::cout << "cfg_stat_data process" << g_front_seg_index <<" handle this msg" << std::endl;
2025-05-09 16:53:07 +08:00
cJSON* onlyip = cJSON_GetObjectItem(messageBody, "ip");
if (onlyip == nullptr) {
std::cout << "Missing 'ip' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string ip = onlyip->valuestring;
2025-05-09 16:53:07 +08:00
cJSON* index_item = cJSON_GetObjectItem(messageBody, "proindex");
if (index_item == nullptr) {
std::cout << "Missing 'proindex' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
int proindex = index_item->valueint;
std::cout << "proindex is :" << proindex <<std::endl;
//校验数据
if((fun == "start" || fun == "delete") &&
isValidIP(ip) &&
(frontType == "stat" || frontType == "recall" || frontType == "realTime" || frontType == "comtrade") &&
(proindex >= 10 && proindex < 100)){ //单连测试用的进程号应该大于10小于100
execute_bash_debug(fun, ip, frontType,proindex);
2025-05-30 15:40:20 +08:00
DIY_WARNLOG("process","【WARN】前置的%s%d号进程执行指令:%s,start开启单连进程,delete杀死单连进程",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
2025-05-09 16:53:07 +08:00
}
else{
std::cout << "param is not executable" <<std::endl;
}
std::cout << "this msg should only execute once" <<std::endl;
2025-02-08 17:04:39 +08:00
}
else{
std::cout << "only cfg_stat_data index 1 can control process,this process not handle this msg" << std::endl;
}
}
2025-01-16 16:17:01 +08:00
else{
std::cout << "set process code str error" <<std::endl;
}
2025-05-09 16:53:07 +08:00
// 释放 JSON 对象
2025-01-16 16:17:01 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-01-16 16:17:01 +08:00
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
2025-05-09 16:53:07 +08:00
//lnk20250103台账更新部分
// 准备更新内容并生成 XML 字符串
// 添加缩进的函数
2025-01-16 16:17:01 +08:00
void add_indent(std::stringstream& stream, int level) {
for (int i = 0; i < level; ++i) {
2025-05-09 16:53:07 +08:00
stream << " "; // 每一级缩进 2 个空格
2025-01-16 16:17:01 +08:00
}
}
2025-05-09 16:53:07 +08:00
std::string prepare_update(const std::string& code_str, const terminal& json_data,const std::string& guid) //添加guid
2025-01-16 16:17:01 +08:00
{
std::cout << "prepare update" << std::endl;
std::stringstream xmlStream;
2025-05-09 16:53:07 +08:00
int indentLevel = 0; // 缩进级别
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 根节点
2025-01-16 16:17:01 +08:00
add_indent(xmlStream, indentLevel);
xmlStream << "<ledger_update>" << std::endl;
indentLevel++;
2025-05-09 16:53:07 +08:00
// 添加 guid 节点
add_indent(xmlStream, indentLevel);
xmlStream << "<guid>" << guid << "</guid>" << std::endl;
2025-01-16 16:17:01 +08:00
if (code_str == "ledger_modify" || code_str == "add_terminal") {
2025-05-09 16:53:07 +08:00
// 如果是 modify 类型
2025-01-16 16:17:01 +08:00
if (code_str == "ledger_modify") {
add_indent(xmlStream, indentLevel);
xmlStream << "<modify>" << std::endl;
indentLevel++;
}
else {
add_indent(xmlStream, indentLevel);
xmlStream << "<add>" << std::endl;
indentLevel++;
}
2025-05-09 16:53:07 +08:00
// 添加数据部分
2025-01-16 16:17:01 +08:00
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl; // Assuming `addr_str` for IP
add_indent(xmlStream, indentLevel);
xmlStream << "<devType>" << json_data.dev_type << "</devType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalCode>" << json_data.terminal_code << "</terminalCode>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
add_indent(xmlStream, indentLevel);
xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
2025-02-10 17:03:15 +08:00
//lnk20250210
add_indent(xmlStream, indentLevel);
xmlStream << "<processNo>" << json_data.processNo << "</processNo>" << std::endl;
2025-01-16 16:17:01 +08:00
add_indent(xmlStream, indentLevel);
xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
2025-05-09 16:53:07 +08:00
// monitorData 部分
2025-01-16 16:17:01 +08:00
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
const monitor& monitor = json_data.line[i];
add_indent(xmlStream, indentLevel);
xmlStream << "<monitorData" << (i + 1) << ">" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << monitor.monitor_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<name>" << monitor.monitor_name << "</name>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<lineNo>" << monitor.logical_device_seq << "</lineNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<voltageLevel>" << monitor.voltage_level << "</voltageLevel>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ptType>" << monitor.terminal_connect << "</ptType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminal_code>" << monitor.terminal_code << "</terminal_code>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << monitor.status << "</status>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</monitorData>" << std::endl;
}
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
2025-05-09 16:53:07 +08:00
// 结束 modify 或 add 标签
2025-01-16 16:17:01 +08:00
if (code_str == "ledger_modify") {
2025-01-16 19:16:26 +08:00
indentLevel--;
2025-01-16 16:17:01 +08:00
add_indent(xmlStream, indentLevel);
xmlStream << "</modify>" << std::endl;
2025-01-16 19:16:26 +08:00
2025-01-16 16:17:01 +08:00
}
else {
2025-01-16 19:16:26 +08:00
indentLevel--;
2025-01-16 16:17:01 +08:00
add_indent(xmlStream, indentLevel);
xmlStream << "</add>" << std::endl;
2025-01-16 19:16:26 +08:00
2025-01-16 16:17:01 +08:00
}
} else if (code_str == "delete_terminal") {
2025-05-09 16:53:07 +08:00
// 如果是 delete 类型
2025-01-16 16:17:01 +08:00
add_indent(xmlStream, indentLevel);
xmlStream << "<delete>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</delete>" << std::endl;
}
else {
std::cerr << "code_str error" << std::endl;
return "";
}
2025-05-09 16:53:07 +08:00
// 结束根节点
2025-01-16 16:17:01 +08:00
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</ledger_update>" << std::endl;
2025-05-09 16:53:07 +08:00
return xmlStream.str(); // 返回构造的 XML 字符串
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 函数将string字符串转换为整数
2025-01-16 16:17:01 +08:00
int StringToInt(const std::string& str) {
std::stringstream ss(str);
int number;
2025-05-09 16:53:07 +08:00
ss >> number; // 从字符串流中读取整数
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 检查是否转换成功
2025-01-16 16:17:01 +08:00
if (ss.fail()) {
std::cerr << "Conversion failed!" << std::endl;
2025-05-09 16:53:07 +08:00
return 0; // 或者你可以选择返回一个标识失败的值,如-1
2025-01-16 16:17:01 +08:00
}
return number;
}
2025-05-09 16:53:07 +08:00
// 解析 JSON 字符串并执行相应操作
2025-05-30 15:40:20 +08:00
int parse_log(const std::string& json_str) {
2025-05-09 16:53:07 +08:00
// 解析 JSON 字符串
2025-02-25 16:33:11 +08:00
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
2025-05-30 15:40:20 +08:00
return 1;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
// 提取 "messageBody" 部分
2025-02-25 16:33:11 +08:00
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
// 解析 messageBody 中的 JSON 字符串
2025-02-25 16:33:11 +08:00
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
2025-02-25 16:33:11 +08:00
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
// 获取 guid 字段
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
// 根据 guid 字段回复消息
std::string guid = guidstr->valuestring;
// 获取 code 字段
2025-02-25 16:33:11 +08:00
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
//获取进程号
cJSON* process = cJSON_GetObjectItem(messageBody, "processNo");
if (process == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
2025-02-25 16:33:11 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
//判断是不是自己进程号:
int processNo = process->valueint;
2025-02-25 16:33:11 +08:00
2025-05-09 16:53:07 +08:00
// 获取 id 字段
cJSON* idstr = cJSON_GetObjectItem(messageBody, "id");
if (idstr == nullptr) {
std::cout << "Missing 'id' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string id = idstr->valuestring;
2025-02-25 16:33:11 +08:00
2025-05-09 16:53:07 +08:00
// 获取 level 字段
cJSON* levelstr = cJSON_GetObjectItem(messageBody, "level");
if (levelstr == nullptr) {
std::cout << "Missing 'level' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string level = levelstr->valuestring;
// 获取 grade 字段
cJSON* gradestr = cJSON_GetObjectItem(messageBody, "grade");
if (gradestr == nullptr) {
std::cout << "Missing 'grade' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string grade = gradestr->valuestring;
// 获取 logtype 字段
cJSON* logtypestr = cJSON_GetObjectItem(messageBody, "logtype");
if (logtypestr == nullptr) {
std::cout << "Missing 'logtype' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string logtype = logtypestr->valuestring;
// 获取 frontType 字段
cJSON* frontTypestr = cJSON_GetObjectItem(messageBody, "frontType");
if (frontTypestr == nullptr) {
std::cout << "Missing 'frontType' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
std::string frontType = frontTypestr->valuestring;
if (processNo != g_front_seg_index) {
std::cout << "msg index:"<< processNo <<"doesnt match self index:" << g_front_seg_index << std::endl;
2025-02-25 16:33:11 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
if (frontType != subdir) {
std::cout << "msg frontType:"<< frontType <<"doesnt match self frontType:" << subdir << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-05-09 16:53:07 +08:00
}
2025-02-25 16:33:11 +08:00
2025-05-30 15:40:20 +08:00
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理日志上送消息",get_front_msg_from_subdir(), g_front_seg_index);
2025-05-12 16:43:42 +08:00
//进程号和匹配上
2025-05-09 16:53:07 +08:00
std::cout << "msg index:"<< processNo <<" self index:" << g_front_seg_index << std::endl;
std::cout << "msg frontType:"<< frontType <<" self frontType:" << subdir << std::endl;
2025-02-25 16:33:11 +08:00
2025-05-12 16:43:42 +08:00
//回复消息
send_reply_to_kafka(guid,"1","收到实时日志指令");
2025-02-25 16:33:11 +08:00
if (code_str == "set_log") {
2025-05-09 16:53:07 +08:00
//校验数据
if((level == "terminal" || level == "measurepoint") &&
(grade == "NORMAL" || grade == "DEBUG") &&
(logtype == "com" || logtype == "data") &&
(!id.empty() && !is_blank(id))){
2025-02-25 16:33:11 +08:00
2025-05-09 16:53:07 +08:00
//开启开关
process_log_command(id, level, grade, logtype);
}
else{
std::cout << "type doesnt match" <<std::endl;
//记录warm
}
std::cout << "this msg should only execute once" <<std::endl;
2025-02-26 16:39:10 +08:00
}
2025-02-25 16:33:11 +08:00
2025-05-09 16:53:07 +08:00
// 释放 JSON 对象
2025-02-25 16:33:11 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
// 台账更新不区分功能
2025-05-30 15:40:20 +08:00
int parse_control(const std::string& json_str, const std::string& output_dir) {
2025-05-09 16:53:07 +08:00
// 解析 JSON 字符串
2025-01-16 16:17:01 +08:00
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
2025-05-30 15:40:20 +08:00
return 1;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 提取 "messageBody" 部分
2025-02-14 16:44:38 +08:00
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
2025-02-08 17:04:39 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-08 17:04:39 +08:00
}
2025-05-09 16:53:07 +08:00
// 解析 messageBody 中的 JSON 字符串
2025-02-17 16:58:14 +08:00
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-17 16:58:14 +08:00
}
2025-05-09 16:53:07 +08:00
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
2025-02-14 16:44:38 +08:00
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-02-14 16:44:38 +08:00
}
2025-05-09 16:53:07 +08:00
// 获取 code 字段
2025-02-14 16:44:38 +08:00
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
2025-01-16 16:17:01 +08:00
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//获取进程号
cJSON* process = cJSON_GetObjectItem(messageBody, "processNo");
if (process == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//判断是不是自己进程号:
int process_No = process->valueint;
// 获取 guid 字段
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 1;
2025-05-09 16:53:07 +08:00
}
// 根据 guid 字段回复消息
std::string guid = guidstr->valuestring;
//进程号为0的进程处理所有台账更新消息
if (process_No != g_front_seg_index && g_front_seg_index !=0) {
std::cout << "msg index:"<< process_No <<"doesnt match self index:" << g_front_seg_index << std::endl;
2025-01-16 16:17:01 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< process_No <<" self index:" << g_front_seg_index << std::endl;
2025-01-16 16:17:01 +08:00
2025-05-30 15:40:20 +08:00
//记录日志
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_UD.c_str());
2025-05-09 16:53:07 +08:00
//匹配后响应收到台账更新消息
2025-05-12 16:43:42 +08:00
//除了回复收到消息,执行结束后还要回复结果
send_reply_to_kafka(guid,"1","收到台账更新指令");
2025-01-16 16:17:01 +08:00
if (code_str == "add_terminal" || code_str == "ledger_modify") {
2025-05-09 16:53:07 +08:00
std::cout << "add or update ledger" << std::endl;
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 解析 add_terminal 或 ledger_modify
2025-02-14 16:44:38 +08:00
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
2025-01-16 16:17:01 +08:00
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
terminal json_data;
2025-05-09 16:53:07 +08:00
// 填充 terminal_dev 的数据
2025-01-16 16:17:01 +08:00
cJSON* id = cJSON_GetObjectItem(item, "id"); // terminal_id
if (id && id->type == cJSON_String)
std::strncpy(json_data.terminal_id, id->valuestring, sizeof(json_data.terminal_id) - 1);
else
std::strncpy(json_data.terminal_id, "N/A", sizeof(json_data.terminal_id) - 1);
cJSON* name = cJSON_GetObjectItem(item, "name"); // terminal_code
if (name && name->type == cJSON_String)
std::strncpy(json_data.terminal_code, name->valuestring, sizeof(json_data.terminal_code) - 1);
else
std::strncpy(json_data.terminal_code, "N/A", sizeof(json_data.terminal_code) - 1);
cJSON* org_name = cJSON_GetObjectItem(item, "org_name"); // org_name
if (org_name && org_name->type == cJSON_String)
std::strncpy(json_data.org_name, org_name->valuestring, sizeof(json_data.org_name) - 1);
else
std::strncpy(json_data.org_name, "N/A", sizeof(json_data.org_name) - 1);
cJSON* maint_name = cJSON_GetObjectItem(item, "maint_name"); // maint_name
if (maint_name && maint_name->type == cJSON_String)
std::strncpy(json_data.maint_name, maint_name->valuestring, sizeof(json_data.maint_name) - 1);
else
std::strncpy(json_data.maint_name, "N/A", sizeof(json_data.maint_name) - 1);
cJSON* station_name = cJSON_GetObjectItem(item, "stationName"); // station_name
if (station_name && station_name->type == cJSON_String)
std::strncpy(json_data.station_name, station_name->valuestring, sizeof(json_data.station_name) - 1);
else
std::strncpy(json_data.station_name, "N/A", sizeof(json_data.station_name) - 1);
cJSON* manufacturer = cJSON_GetObjectItem(item, "manufacturer"); // tmnl_factory
if (manufacturer && manufacturer->type == cJSON_String)
std::strncpy(json_data.tmnl_factory, manufacturer->valuestring, sizeof(json_data.tmnl_factory) - 1);
else
std::strncpy(json_data.tmnl_factory, "N/A", sizeof(json_data.tmnl_factory) - 1);
cJSON* status = cJSON_GetObjectItem(item, "status"); // tmnl_status
if (status && status->type == cJSON_String)
std::strncpy(json_data.tmnl_status, status->valuestring, sizeof(json_data.tmnl_status) - 1);
else
std::strncpy(json_data.tmnl_status, "N/A", sizeof(json_data.tmnl_status) - 1);
cJSON* dev_type = cJSON_GetObjectItem(item, "devType"); // dev_type
if (dev_type && dev_type->type == cJSON_String)
std::strncpy(json_data.dev_type, dev_type->valuestring, sizeof(json_data.dev_type) - 1);
else
std::strncpy(json_data.dev_type, "N/A", sizeof(json_data.dev_type) - 1);
cJSON* dev_key = cJSON_GetObjectItem(item, "devKey"); // dev_key
if (dev_key && dev_key->type == cJSON_String)
std::strncpy(json_data.dev_key, dev_key->valuestring, sizeof(json_data.dev_key) - 1);
else
std::strncpy(json_data.dev_key, "N/A", sizeof(json_data.dev_key) - 1);
cJSON* dev_series = cJSON_GetObjectItem(item, "series"); // dev_series
if (dev_series && dev_series->type == cJSON_String)
std::strncpy(json_data.dev_series, dev_series->valuestring, sizeof(json_data.dev_series) - 1);
else
std::strncpy(json_data.dev_series, "N/A", sizeof(json_data.dev_series) - 1);
2025-05-09 16:53:07 +08:00
//lnk20250210台账进程号
cJSON* processNo = cJSON_GetObjectItem(item, "processNo"); // processNo转为字符串
2025-02-10 17:03:15 +08:00
if (processNo && processNo->type == cJSON_Number) snprintf(json_data.processNo, sizeof(json_data.processNo), "%d", processNo->valueint);
else strncpy(json_data.processNo, "N/A", sizeof(json_data.processNo) - 1);
2025-01-16 16:17:01 +08:00
cJSON* ip = cJSON_GetObjectItem(item, "ip"); // addr_str
if (ip && ip->type == cJSON_String)
std::strncpy(json_data.addr_str, ip->valuestring, sizeof(json_data.addr_str) - 1);
else
std::strncpy(json_data.addr_str, "N/A", sizeof(json_data.addr_str) - 1);
cJSON* port = cJSON_GetObjectItem(item, "port"); // port
if (port && port->type == cJSON_String)
std::strncpy(json_data.port, port->valuestring, sizeof(json_data.port) - 1);
else
std::strncpy(json_data.port, "N/A", sizeof(json_data.port) - 1);
cJSON* updateTime = cJSON_GetObjectItem(item, "updateTime"); // timestamp
if (updateTime && updateTime->type == cJSON_String)
std::strncpy(json_data.timestamp, updateTime->valuestring, sizeof(json_data.timestamp) - 1);
else
std::strncpy(json_data.timestamp, "N/A", sizeof(json_data.timestamp) - 1);
2025-05-09 16:53:07 +08:00
// monitorData 解析,填充到 line 数组中
2025-01-16 16:17:01 +08:00
cJSON* monitorData = cJSON_GetObjectItem(item, "monitorData");
if (monitorData != nullptr && monitorData->type == cJSON_Array) {
int monitorData_size = cJSON_GetArraySize(monitorData);
2025-05-09 16:53:07 +08:00
for (int j = 0; j < monitorData_size && j < 10; j++) { // 最多 10 个监测点
2025-01-16 16:17:01 +08:00
cJSON* monitor_item = cJSON_GetArrayItem(monitorData, j);
monitor monitor_data;
cJSON* monitor_id = cJSON_GetObjectItem(monitor_item, "id"); // monitor_id
if (monitor_id && monitor_id->type == cJSON_String)
std::strncpy(monitor_data.monitor_id, monitor_id->valuestring, sizeof(monitor_data.monitor_id) - 1);
else
std::strncpy(monitor_data.monitor_id, "N/A", sizeof(monitor_data.monitor_id) - 1);
cJSON* monitor_name = cJSON_GetObjectItem(monitor_item, "name"); // monitor_name
if (monitor_name && monitor_name->type == cJSON_String)
std::strncpy(monitor_data.monitor_name, monitor_name->valuestring, sizeof(monitor_data.monitor_name) - 1);
else
std::strncpy(monitor_data.monitor_name, "N/A", sizeof(monitor_data.monitor_name) - 1);
cJSON* voltage_level = cJSON_GetObjectItem(monitor_item, "voltageLevel"); // voltage_level
if (voltage_level && voltage_level->type == cJSON_String)
std::strncpy(monitor_data.voltage_level, voltage_level->valuestring, sizeof(monitor_data.voltage_level) - 1);
else
std::strncpy(monitor_data.voltage_level, "N/A", sizeof(monitor_data.voltage_level) - 1);
cJSON* monitor_status = cJSON_GetObjectItem(monitor_item, "status"); // status
if (monitor_status && monitor_status->type == cJSON_String)
std::strncpy(monitor_data.status, monitor_status->valuestring, sizeof(monitor_data.status) - 1);
else
std::strncpy(monitor_data.status, "N/A", sizeof(monitor_data.status) - 1);
2025-01-16 19:16:26 +08:00
cJSON* lineNo = cJSON_GetObjectItem(monitor_item, "lineNo"); // logical_device_seq
2025-01-16 16:17:01 +08:00
if (lineNo && lineNo->type == cJSON_String)
2025-01-16 19:16:26 +08:00
2025-01-16 16:17:01 +08:00
std::strncpy(monitor_data.logical_device_seq, lineNo->valuestring, sizeof(monitor_data.logical_device_seq) - 1);
2025-01-16 19:16:26 +08:00
else
2025-01-16 16:17:01 +08:00
std::strncpy(monitor_data.logical_device_seq, "N/A", sizeof(monitor_data.logical_device_seq) - 1);
2025-01-16 19:16:26 +08:00
cJSON* ptType = cJSON_GetObjectItem(monitor_item, "ptType"); // terminal_connect
2025-01-16 16:17:01 +08:00
if (ptType && ptType->type == cJSON_String)
2025-01-16 19:16:26 +08:00
2025-01-16 16:17:01 +08:00
std::strncpy(monitor_data.terminal_connect, ptType->valuestring, sizeof(monitor_data.terminal_connect) - 1);
2025-01-16 19:16:26 +08:00
else
2025-01-16 16:17:01 +08:00
std::strncpy(monitor_data.terminal_connect, "N/A", sizeof(monitor_data.terminal_connect) - 1);
std::strncpy(monitor_data.timestamp, json_data.timestamp, sizeof(monitor_data.timestamp) - 1);
std::strncpy(monitor_data.terminal_code, json_data.terminal_code, sizeof(monitor_data.terminal_code) - 1);
2025-05-09 16:53:07 +08:00
// 填充到 line 数组
2025-01-16 16:17:01 +08:00
json_data.line[j] = monitor_data;
}
}
2025-01-16 19:16:26 +08:00
print_terminal(&json_data);
2025-05-09 16:53:07 +08:00
// 准备 XML 内容并写入文件
std::string xmlContent = prepare_update(code_str, json_data,guid);//添加guid20250506
2025-01-16 16:17:01 +08:00
if (xmlContent != "") {
std::cout << "write to xml in /FeProject/etc/ledger_update" <<std::endl;
char nodeid[20];
2025-05-09 16:53:07 +08:00
std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int
2025-01-16 16:17:01 +08:00
std::string nodeid_str(nodeid);
std::string frontindex_str = intToString(g_front_seg_index);
std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_" + code_str + ".xml";
writeToFile(file_name, xmlContent);
}
}
}
}
else if (code_str == "delete_terminal") {
std::cout << "delete ledger" <<std::endl;
2025-05-09 16:53:07 +08:00
// 解析 delete_terminal
2025-02-14 16:44:38 +08:00
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
2025-01-16 16:17:01 +08:00
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
2025-05-09 16:53:07 +08:00
// 只解析 id 字段
2025-01-16 16:17:01 +08:00
cJSON* id = cJSON_GetObjectItem(item, "id");
if (id != nullptr) {
terminal json_data;
std::strncpy(json_data.terminal_id, cJSON_GetObjectItem(item, "id")->valuestring, sizeof(json_data.terminal_id) - 1);
2025-05-09 16:53:07 +08:00
// 准备 XML 内容并写入文件
std::string xmlContent = prepare_update(code_str, json_data,guid);//添加guid20250506
2025-01-16 16:17:01 +08:00
if(xmlContent != ""){
char nodeid[20];
2025-05-09 16:53:07 +08:00
std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int
2025-01-16 16:17:01 +08:00
std::string nodeid_str(nodeid);
std::string frontindex_str = intToString(g_front_seg_index);
std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_delete_terminal.xml";
2025-05-09 16:53:07 +08:00
writeToFile(file_name, xmlContent); //写文件加上guid读文件
2025-01-16 16:17:01 +08:00
}
}
}
}
}
else{
std::cout << "code_str error" <<std::endl;
}
2025-05-09 16:53:07 +08:00
// 释放 JSON 对象
2025-01-16 16:17:01 +08:00
cJSON_Delete(root);
2025-05-30 15:40:20 +08:00
return 0;
2025-01-16 16:17:01 +08:00
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
int find_dev_index_from_dev_id(std::string dev_id)
{
ied_t* ied = NULL;
int iedno;
ied_usr_t* ied_usr = NULL;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = (ied_usr_t*)ied->usr_ext;
if (ied_usr && strcmp(ied_usr->terminal_id, dev_id.c_str()) == 0) {
return ied_usr->dev_idx;
}
}
return 0;
}
int find_mp_index_from_mp_id(std::string line)
{
LD_info_t* LD_info = NULL;
LD_info = find_LD_info_only_from_mp_id((char*)line.c_str());
if(LD_info == NULL){
return 0;
}
else{
return LD_info->line_id;
}
}
2026-02-04 09:21:54 +08:00
char* find_mp_name_from_mp_id(const char* mp_id)
{
LD_info_t* LD_info = NULL;
LD_info = find_LD_info_only_from_mp_id((char*)mp_id);
if(LD_info == NULL){
return 0;
}
else{
return LD_info->name;
}
}
2025-01-16 16:17:01 +08:00
int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg)
{
2025-05-09 16:53:07 +08:00
if(INITFLAG != 1)return 1;//防止崩溃
2025-03-04 17:29:04 +08:00
2025-01-16 16:17:01 +08:00
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
2025-05-30 15:40:20 +08:00
//记录日志
DIY_INFOLOG("process","【NORMAL】前置消费topic:%s_%s的实时触发消息",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str());
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 处理消息(例如,打印消息内容)
2025-01-16 16:17:01 +08:00
std::cout << "rt data Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
2025-05-09 16:53:07 +08:00
//处理消费数据
2025-01-16 16:17:01 +08:00
std::string devid, line;
bool realData, soeData;
int limit;
2025-05-09 16:53:07 +08:00
// 解析 JSON 数据
2025-01-16 16:17:01 +08:00
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
2025-05-30 15:40:20 +08:00
//记录日志
DIY_ERRORLOG_CODE("process",LOG_CODE_RT_DATA,"【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str());
2025-01-16 16:17:01 +08:00
return E_RECONSUME_LATER;
}
2025-05-09 16:53:07 +08:00
//mq处理实时数据指令查询台账时添加锁
2025-05-30 15:40:20 +08:00
pthread_mutex_lock(&mtx); std::cout << "rtdata hold lock !!!!!!!!!!!" << std::endl;
2025-01-16 16:17:01 +08:00
int dev_index = find_dev_index_from_dev_id(devid);
int mp_index = find_mp_index_from_mp_id(line);
2025-05-30 15:40:20 +08:00
pthread_mutex_unlock(&mtx); std::cout << "rtdata free lock !!!!!!!!!!!" << std::endl;
2025-01-16 16:17:01 +08:00
if(dev_index == 0 || mp_index == 0){
std::cerr << "dev index or mp index is 0" << std::endl;
return E_RECONSUME_LATER;
}
2025-05-09 16:53:07 +08:00
// 创建 XML 文件
2025-01-16 16:17:01 +08:00
if (!createXmlFile(dev_index, mp_index, realData, soeData, limit,"new")) {
DIY_ERRORLOG_CODE("process",LOG_CODE_RT_DATA,"【ERROR】前置无法创建实时数据触发文件");
2025-01-16 16:17:01 +08:00
std::cerr << "Failed to create the XML file." << std::endl;
return E_RECONSUME_LATER;
}
}
2025-05-09 16:53:07 +08:00
// 根据业务逻辑决定返回状态
2025-01-16 16:17:01 +08:00
return E_CONSUME_SUCCESS;
}
int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg)
{
2025-05-09 16:53:07 +08:00
if(INITFLAG != 1)return 1;//防止崩溃
2025-03-04 17:29:04 +08:00
2025-01-16 16:17:01 +08:00
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
2025-05-30 15:40:20 +08:00
2025-05-09 16:53:07 +08:00
//处理消费数据
2025-01-16 16:17:01 +08:00
std::cout << "ledger update Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
2025-05-09 16:53:07 +08:00
//处理台账更新消息
2025-01-16 16:17:01 +08:00
std::string updatefilepath = "/home/pq/FeProject/etc/ledgerupdate";
2025-05-30 15:40:20 +08:00
if(parse_control(body,updatefilepath)){
DIY_ERRORLOG_CODE("process",LOG_CODE_LEDGER_UPDATE,"【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_UD.c_str());
2025-05-30 15:40:20 +08:00
}
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 根据业务逻辑决定返回状态
2025-01-16 16:17:01 +08:00
return E_CONSUME_SUCCESS;
}
int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg)
{
2025-05-09 16:53:07 +08:00
if(INITFLAG != 1)return 1;//防止崩溃
2025-01-16 16:17:01 +08:00
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
2025-05-09 16:53:07 +08:00
//处理消费数据
2025-01-16 16:17:01 +08:00
std::cout << "process Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
2025-05-09 16:53:07 +08:00
//处理进程更新消息
2025-05-30 15:40:20 +08:00
if(parse_set(body)){
DIY_ERRORLOG_CODE("process",LOG_CODE_PROCESS_CONTROL,"【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_SET.c_str());
2025-05-30 15:40:20 +08:00
}
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 根据业务逻辑决定返回状态
2025-01-16 16:17:01 +08:00
return E_CONSUME_SUCCESS;
}
2025-02-25 16:33:11 +08:00
int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg)
{
2025-05-09 16:53:07 +08:00
if(INITFLAG != 1)return 1;//防止崩溃
2025-02-25 16:33:11 +08:00
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
2025-05-09 16:53:07 +08:00
//处理消费数据
2025-02-25 16:33:11 +08:00
std::cout << "process Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
2025-05-09 16:53:07 +08:00
//处理进程更新消息
2025-05-30 15:40:20 +08:00
if(parse_log(body)){
DIY_ERRORLOG_CODE("process",LOG_CODE_LOG_REQUEST,"【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_LOG.c_str());
2025-05-30 15:40:20 +08:00
}
2025-02-25 16:33:11 +08:00
}
2025-05-09 16:53:07 +08:00
// 根据业务逻辑决定返回状态
2025-02-25 16:33:11 +08:00
return E_CONSUME_SUCCESS;
}
2025-01-16 16:17:01 +08:00
int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg)
{
2025-05-09 16:53:07 +08:00
if(INITFLAG != 1)return 1;//防止崩溃
//调试
2025-01-16 16:17:01 +08:00
std::cout << "myMessageCallbackrecall"<< std::endl;
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
2025-05-09 16:53:07 +08:00
// 处理消息(例如,打印消息内容)
2025-01-16 16:17:01 +08:00
std::cout << "recall Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
2025-05-09 16:53:07 +08:00
//处理消费数据
std::string result = extractDataJson(body); // 使用 std::string 代替 malloc
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//调试
2025-01-16 16:17:01 +08:00
std::cout << "extractDataJson:"<< result.c_str() <<std::endl;
if (!result.empty()) {
2025-02-24 16:45:42 +08:00
pthread_mutex_lock(&mtx); std::cout << "recall mq hold lock !!!!!!!!!!!" << std::endl;
2025-05-09 16:53:07 +08:00
recall_json_handle(result.c_str()); // 使用 c_str() 获取 const char* 类型
2025-02-24 16:45:42 +08:00
pthread_mutex_unlock(&mtx); std::cout << "recall mq free lock !!!!!!!!!!!" << std::endl;
2025-01-16 16:17:01 +08:00
}
else{
std::cerr << "recall data is NULL." << std::endl;
DIY_ERRORLOG_CODE("process",LOG_CODE_RECALL,"【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RC.c_str());
2025-01-16 16:17:01 +08:00
}
}
2025-05-09 16:53:07 +08:00
// 根据业务逻辑决定返回状态
2025-01-16 16:17:01 +08:00
return E_CONSUME_SUCCESS;
}
2025-02-25 16:33:11 +08:00
2025-01-16 16:17:01 +08:00
void mqconsumerThread::run()
{
2025-05-09 16:53:07 +08:00
// 配置消费者参数
std::string consumerName = subdir + intToString(g_front_seg_index) + "_start_" + QDateTime::currentDateTime().toString("yyyyMMddhhmmss").toStdString(); // 消费者组ID+启动时间,不消费历史消息
std::string nameServer = G_MQCONSUMER_IPPORT; // NameServer地址
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 定义多个主题、标签及其对应的回调函数
2025-01-16 16:17:01 +08:00
std::vector<Subscription> subscriptions;
2025-05-09 16:53:07 +08:00
// 初始化消费者1 //lnk20241230只有实时进程会订阅实时topic不订阅实时topic的进程无法触发实时数据
2025-01-16 16:17:01 +08:00
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID){
2025-05-12 16:43:42 +08:00
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata));
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
// 初始化消费者2 //所有进程都会订阅台账更新topic不同功能进程的台账不能互相影响
2025-05-12 16:43:42 +08:00
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate));
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 初始化消费者3 //lnk20241230只有补招进程会订阅补招topic不订阅补招topic的进程无法触发补招数据
2025-01-16 16:17:01 +08:00
if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID){
2025-05-12 16:43:42 +08:00
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall));
2025-01-16 16:17:01 +08:00
}
2025-05-12 16:43:42 +08:00
// 初始化消费者4 //lnk20250108只有稳态进程1会控制reset
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset));
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
// 初始化消费者5 //所有进程都会订阅日志上送topic不同功能进程的日志上送不能互相影响
2025-05-12 16:43:42 +08:00
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog));
2025-02-25 16:33:11 +08:00
2025-01-16 16:17:01 +08:00
try {
rocketmq_consumer_receive(consumerName, nameServer, subscriptions);
}
catch (const std::exception& e) {
std::cerr << "Exception during consumerUD setup: " << e.what() << std::endl;
}
2025-05-09 16:53:07 +08:00
// 程序运行中,消费者会通过回调处理消息
2025-01-16 16:17:01 +08:00
std::cout << "Consumer is running. " << std::endl;
}
//CZY 2023-08-23 get double class voltage level, if false will return 0;
double get_voltage_level(char voltage_level_char[]) {
try
{
int n = atoi(voltage_level_char);
switch (n)
{
2025-05-09 16:53:07 +08:00
case 1://交流6V
2025-01-16 16:17:01 +08:00
return 0.006;
2025-05-09 16:53:07 +08:00
case 2://交流12V
2025-01-16 16:17:01 +08:00
return 0.012;
2025-05-09 16:53:07 +08:00
case 3://交流24V
2025-01-16 16:17:01 +08:00
return 0.024;
2025-05-09 16:53:07 +08:00
case 4://交流36V
2025-01-16 16:17:01 +08:00
return 0.036;
2025-05-09 16:53:07 +08:00
case 5://交流48V
2025-01-16 16:17:01 +08:00
return 0.048;
2025-05-09 16:53:07 +08:00
case 6://交流110V
2025-01-16 16:17:01 +08:00
return 0.11;
2025-05-09 16:53:07 +08:00
case 7://交流220V
2025-01-16 16:17:01 +08:00
return 0.22;
2025-05-09 16:53:07 +08:00
case 8://交流380V含400V
2025-01-16 16:17:01 +08:00
return 0.38;
2025-05-09 16:53:07 +08:00
case 9://交流660V
2025-01-16 16:17:01 +08:00
return 0.66;
2025-05-09 16:53:07 +08:00
case 10://交流1000V含1140V
2025-01-16 16:17:01 +08:00
return 1;
2025-05-09 16:53:07 +08:00
case 11://交流600V
2025-01-16 16:17:01 +08:00
return 0.6;
2025-05-09 16:53:07 +08:00
case 12://交流750V
2025-01-16 16:17:01 +08:00
return 0.75;
2025-05-09 16:53:07 +08:00
case 13://交流1500V
2025-01-16 16:17:01 +08:00
return 1.5;
2025-05-09 16:53:07 +08:00
case 14://交流2000V
2025-01-16 16:17:01 +08:00
return 2.0;
2025-05-09 16:53:07 +08:00
case 15://交流2500V
2025-01-16 16:17:01 +08:00
return 2.5;
2025-05-09 16:53:07 +08:00
case 20://交流3kV
2025-01-16 16:17:01 +08:00
return 3;
2025-05-09 16:53:07 +08:00
case 21://交流6kV
2025-01-16 16:17:01 +08:00
return 6;
2025-05-09 16:53:07 +08:00
case 22://交流10kV
2025-01-16 16:17:01 +08:00
return 10;
2025-05-09 16:53:07 +08:00
case 23://交流15.75kV
2025-01-16 16:17:01 +08:00
return 15.75;
2025-05-09 16:53:07 +08:00
case 24://交流20kV
2025-01-16 16:17:01 +08:00
return 20;
2025-05-09 16:53:07 +08:00
case 25://交流35kV
2025-01-16 16:17:01 +08:00
return 35;
2025-05-09 16:53:07 +08:00
case 30://交流66kV
2025-01-16 16:17:01 +08:00
return 66;
2025-05-09 16:53:07 +08:00
case 31://交流72.5kV
2025-01-16 16:17:01 +08:00
return 72.5;
2025-05-09 16:53:07 +08:00
case 32://交流110kV
2025-01-16 16:17:01 +08:00
return 110;
2025-05-09 16:53:07 +08:00
case 33://交流220kV
2025-01-16 16:17:01 +08:00
return 220;
2025-05-09 16:53:07 +08:00
case 34://交流330kV
2025-01-16 16:17:01 +08:00
return 330;
2025-05-09 16:53:07 +08:00
case 35://交流500kV
2025-01-16 16:17:01 +08:00
return 500;
2025-05-09 16:53:07 +08:00
case 36://交流750kV
2025-01-16 16:17:01 +08:00
return 750;
2025-05-09 16:53:07 +08:00
case 37://交流1000kV
2025-01-16 16:17:01 +08:00
return 1000;
2025-05-09 16:53:07 +08:00
case 51://直流6V
2025-01-16 16:17:01 +08:00
return 0.006;
2025-05-09 16:53:07 +08:00
case 52://直流12V
2025-01-16 16:17:01 +08:00
return 0.012;
2025-05-09 16:53:07 +08:00
case 53://直流24V
2025-01-16 16:17:01 +08:00
return 0.024;
2025-05-09 16:53:07 +08:00
case 54://直流36V
2025-01-16 16:17:01 +08:00
return 0.036;
2025-05-09 16:53:07 +08:00
case 55://直流48V
2025-01-16 16:17:01 +08:00
return 0.048;
2025-05-09 16:53:07 +08:00
case 56://直流110V
2025-01-16 16:17:01 +08:00
return 0.11;
2025-05-09 16:53:07 +08:00
case 60://直流220V
2025-01-16 16:17:01 +08:00
return 0.22;
2025-05-09 16:53:07 +08:00
case 70://直流600V
2025-01-16 16:17:01 +08:00
return 0.6;
2025-05-09 16:53:07 +08:00
case 71://直流750V
2025-01-16 16:17:01 +08:00
return 0.75;
2025-05-09 16:53:07 +08:00
case 72://直流1500V
2025-01-16 16:17:01 +08:00
return 1.5;
2025-05-09 16:53:07 +08:00
case 73://直流3000V
2025-01-16 16:17:01 +08:00
return 3.0;
2025-05-09 16:53:07 +08:00
case 76://直流35kV
2025-01-16 16:17:01 +08:00
return 35;
2025-05-09 16:53:07 +08:00
case 77://直流30kV
2025-01-16 16:17:01 +08:00
return 30;
2025-05-09 16:53:07 +08:00
case 78://直流50kV
2025-01-16 16:17:01 +08:00
return 50;
2025-05-09 16:53:07 +08:00
case 80://直流120kV
2025-01-16 16:17:01 +08:00
return 120;
2025-05-09 16:53:07 +08:00
case 81://直流125kV
2025-01-16 16:17:01 +08:00
return 125;
2025-05-09 16:53:07 +08:00
case 82://直流400kV
2025-01-16 16:17:01 +08:00
return 400;
2025-05-09 16:53:07 +08:00
case 83://直流500kV
2025-01-16 16:17:01 +08:00
return 500;
2025-05-09 16:53:07 +08:00
case 84://直流660kV
2025-01-16 16:17:01 +08:00
return 660;
2025-05-09 16:53:07 +08:00
case 85://直流800kV
2025-01-16 16:17:01 +08:00
return 800;
2025-05-09 16:53:07 +08:00
case 86://直流1000kV
2025-01-16 16:17:01 +08:00
return 1000;
2025-05-09 16:53:07 +08:00
case 87://直流200kV
2025-01-16 16:17:01 +08:00
return 200;
2025-05-09 16:53:07 +08:00
case 88://直流320kV。
2025-01-16 16:17:01 +08:00
return 320;
default:
return 0;
break;
}
}
catch (const std::exception&)
{
//error
return 0;
}
}
void try_start_kafka_thread()
{
static int kafka_thread_created = 0;
if (!kafka_thread_created) {
myThrd.start();
kafka_thread_created = 1;
}
}
//lnk20241213
void try_start_mqconsumer_thread()
{
static int mqconsumer_thread_created = 0;
if (!mqconsumer_thread_created) {
mqconsumerThrd.start();
mqconsumer_thread_created = 1;
}
}
/////////////////////////////////////////////////////////////////////////
json_block_data json_blkd;
2025-05-09 16:53:07 +08:00
//CZY 2023-08-17 WW 2022年12月6日14:09:08 增加多个ICD支持
//json_block_data json_blkd; //json拼接参数类对象,原有的一个数据对象在多ICD下会出现数据错位问题
//解决方案是将此数据放入LDInfo结构中存储保证一条线路一个json拼接参数类对象
void init_json_block_data(char mp_id[], char voltage_level[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
2025-01-16 16:17:01 +08:00
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_flicker_data_map.insert(mp_id, pdata);
}
pdata = json_flicker_data_map.value(mp_id);
}
else if (flicker_flag == 0) {
if (!json_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_data_map.insert(mp_id, pdata);
}
pdata = json_data_map.value(mp_id);
}
else if (flicker_flag == 2) {
if (!json_pst_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_pst_data_map.insert(mp_id, pdata);
}
pdata = json_pst_data_map.value(mp_id);
}
if (pdata == NULL)
return;
pdata->monitorId = -1;
QString tmp;
tmp.append(mp_id);
pdata->mp_id = tmp;
pdata->func_type = g_node_id;
2025-05-09 16:53:07 +08:00
//flag 是品质, 异常送1 正常送0
pdata->flag = 0; // //剔除标记1不剔除0剔除默认剔除
2025-01-16 16:17:01 +08:00
pdata->mms_str_map.clear();
pdata->voltage_level = get_voltage_level(voltage_level); //CZY 2023-08-23 add voltage_level
}
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
int json_block_create_start(char voltage_level[], char monid_char[], int flicker_flag, char temcode[], int line_id)//WW 2023年3月13日16:38 : 41 多ICD修改
2025-01-16 16:17:01 +08:00
{
try_start_kafka_thread();
init_json_block_data(monid_char, voltage_level, flicker_flag);
json_block_data* pdata;
if (flicker_flag == 1) {
2025-05-09 16:53:07 +08:00
if (!json_flicker_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
2025-05-09 16:53:07 +08:00
if (!json_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
2025-05-09 16:53:07 +08:00
if (!json_pst_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
{
pdata->dev_type.append(temcode);
pdata->monitorId = line_id;
if (strlen(monid_char) != 0) {
QString tmp;
tmp.append(monid_char);
pdata->mp_id = tmp;
}
else {
monid_char = "not define";
QString tmp;
tmp.append(monid_char);
pdata->mp_id = tmp;
}
}
printf("\n\n---------- json_block_create_start: mp_id=%s,voltage_level=%s,line_id=%d \n", monid_char, voltage_level, line_id);
return TRUE;
}
2025-04-29 15:05:36 +08:00
2025-05-09 16:53:07 +08:00
int json_block_create_time(char monid_char[], long long Time, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
2025-01-16 16:17:01 +08:00
{
json_block_data* pdata;
if (flicker_flag == 1) {
2025-05-09 16:53:07 +08:00
if (!json_flicker_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
2025-05-09 16:53:07 +08:00
if (!json_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
2025-05-09 16:53:07 +08:00
if (!json_pst_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
pdata->time = Time;
printf("\njson_block_create_time: mp_id=%s,Time=%lld \n", monid_char, Time);
return TRUE;
}
2025-05-09 16:53:07 +08:00
int json_block_create_flag(char monid_char[], int flag, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
2025-01-16 16:17:01 +08:00
{
json_block_data* pdata;
if (flicker_flag == 1) {
2025-05-09 16:53:07 +08:00
if (!json_flicker_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
2025-05-09 16:53:07 +08:00
if (!json_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
2025-05-09 16:53:07 +08:00
if (!json_pst_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
pdata->flag = flag;
printf("\njson_block_create_flag: mp_id=%s,flag=%d \n", monid_char, flag);
return TRUE;
}
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
int json_block_create_data(char monid_char[], char* mms_str, double v, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
2025-01-16 16:17:01 +08:00
{
json_block_data* pdata;
if (flicker_flag == 1) {
2025-05-09 16:53:07 +08:00
if (!json_flicker_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
2025-05-09 16:53:07 +08:00
if (!json_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
2025-05-09 16:53:07 +08:00
if (!json_pst_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
return 0;
pdata = json_pst_data_map.value(monid_char);
}
static int count = 0;
if (pdata != NULL)
{
pdata->mms_str_map.insert(QString::fromAscii(mms_str), v);
2026-03-04 19:15:32 +08:00
if (strstr(mms_str, "MMXU2$MX$PhV")){
pdata->data_have_statistic = 1;
2025-01-16 16:17:01 +08:00
printf("---------- json_block_create_data: mp_id= %s ,mms_str=%s value=%fkV----------\n", monid_char, mms_str, v);
2026-03-04 19:15:32 +08:00
}
2025-01-16 16:17:01 +08:00
}
return TRUE;
}
2025-05-09 16:53:07 +08:00
//lnk2024-8-16添加接线参数
int json_block_create_end(char v_wiring_type[], char monid_char[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
2025-01-16 16:17:01 +08:00
{
json_block_data* pdata;
if (flicker_flag == 1) {
2025-05-09 16:53:07 +08:00
if (!json_flicker_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
{
printf("---------- json_block_create_end: mp_id= %s json_flicker_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
2025-05-09 16:53:07 +08:00
if (!json_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
{
printf("---------- json_block_create_end: mp_id= %s json_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
2025-05-09 16:53:07 +08:00
if (!json_pst_data_map.contains(monid_char))//未查到数据
2025-01-16 16:17:01 +08:00
{
printf("---------- json_block_create_end: mp_id= %s json_pst_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_pst_data_map.value(monid_char);
}
2025-07-29 18:10:03 +08:00
//调试用
/*if (pdata != nullptr) {
printf("monitorId: %d\n", pdata->monitorId);
printf("func_type: %d\n", pdata->func_type);
printf("flag: %d\n", pdata->flag);
printf("time: %lld\n", pdata->time);
printf("voltage_level: %f\n", pdata->voltage_level);
printf("mp_id: %s\n", pdata->mp_id.toStdString().c_str());
printf("dev_type: %s\n", pdata->dev_type.toStdString().c_str());
printf("mms_str_map count: %d\n", pdata->mms_str_map.size());
} else {
printf("pdata is NULL\n");
}*/
2025-01-16 16:17:01 +08:00
if (pdata->mms_str_map.count() == 0) {
if (flicker_flag == 1) {
json_flicker_data_map.remove(monid_char);
}
else if (flicker_flag == 0)
{
json_data_map.remove(monid_char);
}
else if (flicker_flag == 2)
{
json_pst_data_map.remove(monid_char);
}
printf("---------- json_block_create_end: pdata->mms_str_map.count() == 0 ----------\n");
return 1;
}
2025-05-09 16:53:07 +08:00
//lnk2024-8-16添加接线参数
2025-02-14 16:44:38 +08:00
int ret = transfer_json_block_data(v_wiring_type, pdata);
2025-01-16 16:17:01 +08:00
if (pdata != NULL)
delete pdata;
if (flicker_flag == 1) {
json_flicker_data_map.remove(monid_char);
}
else if (flicker_flag == 0)
{
json_data_map.remove(monid_char);
}
else if (flicker_flag == 2)
{
json_pst_data_map.remove(monid_char);
}
printf("---------- json_block_create_end: MonitorId= %s ----------\n", monid_char);
return ret;
}
//////////////////////////////////////////////////////////////////////////////
void clear_old_comtrade_files()
{
if (g_node_id != SOE_COMTRADE_BASE_NODE_ID)
return;
QString full_fn_str;
QString dir_name("../comtrade/");
QDir directory_comtrade(dir_name);
QStringList fileNames = directory_comtrade.entryList(QDir::Files | QDir::NoDotAndDotDot, QDir::Time);
if (fileNames.size() <= comtrade_remain_file_num)
return;
for (QStringList::size_type i = comtrade_remain_file_num; i != fileNames.size(); ++i) {
full_fn_str = dir_name + fileNames.at(i);
QFile::remove(full_fn_str);
}
}
/////////////////////////////////////////////
int process_login_verify()
{
int length = 64;
char password[64 + 1];
char* p = NULL;
int count = 0;
char encode_password[256];
const char* passwordConfirm = "1c0e4e104de596846648ba495bd32601";
memset(password, 0, sizeof(password));
printf("Please input password : \n");
p = password;
count = 0;
system("stty -echo");
std::cin.getline(password, 64);
system("stty echo");
password[length] = '\0';
MyGetSM4Code(password, (unsigned char*)"epri.sgcc.com.cn", encode_password);
return (strcmp(encode_password, passwordConfirm));
}
///////////////////////////////////////////
void try_start_socket_thread()
{
static int socket_thread_created = 0;
if (!socket_thread_created) {
socketThrd.start();
socket_thread_created = 1;
}
}
//lnk20241029
void try_start_web_http_thread()
{
static int webhttp_thread_created = 0;
if (!webhttp_thread_created) {
webhttpThrd.start();
webhttp_thread_created = 1;
}
}
void try_start_http_thread()
{
static int http_thread_created = 0;
if (!http_thread_created) {
httpThrd.start();
http_thread_created = 1;
}
}
//lnk20241202
int try_start_mqtest_thread(int argc, char *argv[])
{
2025-05-09 16:53:07 +08:00
//不使用简单的循环线程而是启动一个app不仅执行循环线程而且可以连接输入
//安装qt打印
2025-03-03 18:20:00 +08:00
qInstallMsgHandler(myQtMsgHandler);
2025-01-16 16:17:01 +08:00
QCoreApplication a(argc, argv);
2025-05-09 16:53:07 +08:00
// 创建 QThread 和 Worker 对象
2025-01-16 16:17:01 +08:00
QThread *thread = new QThread();
Worker *worker = new Worker();
2025-05-09 16:53:07 +08:00
// 将 Worker 对象移动到 QThread 中
2025-01-16 16:17:01 +08:00
worker->moveToThread(thread);
2025-05-09 16:53:07 +08:00
// 连接信号和槽
2025-01-16 16:17:01 +08:00
QObject::connect(thread, SIGNAL(started()), worker, SLOT(startServer()));
QObject::connect(worker, SIGNAL(serverError()), thread, SLOT(quit()));
QObject::connect(worker, SIGNAL(serverError()), worker, SLOT(deleteLater()));
QObject::connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
2025-05-09 16:53:07 +08:00
// 启动线程
2025-01-16 16:17:01 +08:00
thread->start();
2025-05-09 16:53:07 +08:00
// 确保在应用退出时,线程也能正确退出
2025-01-16 16:17:01 +08:00
QObject::connect(&a, SIGNAL(aboutToQuit()), thread, SLOT(quit()));
return a.exec();
}
void try_start_ontimer_thread()
{
static int ontimer_thread_created = 0;
if (!ontimer_thread_created) {
onTimerThrd.start();
ontimer_thread_created = 1;
}
}
//WW 2023-08-22 end
///////////////////////////////////////////
2025-05-09 16:53:07 +08:00
//ZW 2024-01-31 补招数据模式优化
static QMap<QString, int> mvl_type_ctrl_map;//ZW 2024-01-31 用于保存单次获取的模型
static int mvl_type_ctrl_map_size;//计数
//添加doname对应的数据模型
2025-01-16 16:17:01 +08:00
void add_mvl_type_ctrl(char doname[], int ctrl)
{
if (!mvl_type_ctrl_map.contains(doname))
{
mvl_type_ctrl_map.insert(doname, ctrl);
}
}
2025-05-09 16:53:07 +08:00
//删除map中所有数据模型
2025-01-16 16:17:01 +08:00
void del_mvl_type_ctrl()
{
for (QMap<QString, int>::iterator it = mvl_type_ctrl_map.begin(); it != mvl_type_ctrl_map.end(); ++it) {
QString key = it.key();
int value = it.value();
mvl_type_id_destroy(value);
}
mvl_type_ctrl_map.clear();
}
int get_mvl_type_ctrl_map_size()
{
return mvl_type_ctrl_map.count();
}
2025-05-09 16:53:07 +08:00
//查找对应doname的数据模型是否存在map中
2025-01-16 16:17:01 +08:00
int sel_mvl_type_ctrl_flag(char doname[])
{
if (mvl_type_ctrl_map.contains(doname))
{
return mvl_type_ctrl_map.value(doname);
}
else
{
return -1;
}
}
//ZW 2024-01-31 end