Files
microser/json/save2json.cpp

2602 lines
80 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: save2json.cpp,v $
* @brief: $IEC 61850 Protocol
*
* @version: $Revision: 1.21 $
* @date: $Date: 2020/10/27 08:51:07 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: save2json.cpp,v 1.21 2020/10/27 08:51:07 lizhongming Exp $
*
*/
using namespace std;
#include <iostream>
#include <stdio.h>
#include <assert.h>
#include <fstream> // std::filebuf
#include <string.h>
#include <sstream>
#include "qdebug.h"
#include <QSettings>
#include <QDateTime>
#include <QDir>
#include <QMap>//CZY 2023-08-17 WW 2023年3月13日17:21:02 增加多ICD支持
#include <apr_uuid.h>
#include <apr_strings.h>
#include "../log4cplus/log4.h"//lnk添加log4
#include "../mms/db_interface.h"
#include "../json/save2json.h"
#include "../json/mms_json_inter.h"
#include "kafka_producer.h"
#include "../rocketmq/CPushConsumer.h"
#include <vector>
#include "../json/cjson.h" //解json
#include <sstream> //创建xml
#include <fstream> //创建xml
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type);
extern int recall_json_handle(const char* jstr);
extern std::string intToString(int number);
int StringToInt(const std::string& str);
extern pthread_mutex_t mtx;//lnk20250115
#ifdef __cplusplus
extern "C" {
//解决编译lnk20250509
#define thisFileName __FILE__
#include "../mms/rdb_client.h"
#include "node.h"//lnk20241223
#include "mvl_defs.h"
#include "mms_vvar.h"
#endif /* __cplusplus */
extern unsigned int g_node_id;
extern int g_front_seg_index;
extern char subdir[128];
extern int comtrade_remain_file_num;
extern node_t* g_node; //lnk20241223
extern LD_info_t* find_LD_info_only_from_mp_id(char* mp_id);//lnk20241223
extern void print_terminal(const terminal* tmnl);
#ifdef __cplusplus
}
#endif
#ifndef nullptr
#define nullptr NULL
#endif
extern uint32_t g_mqproducer_blocked_times;
extern int INITFLAG;
extern std::string FRONT_INST;
extern QMutex kafka_data_list_mutex;
extern QList<Ckafka_data_t> kafka_data_list;
extern QMutex oss_data_list_mutex;
extern QList<oss_data_t> oss_data_list;
extern int FILE_FLAG;
KafkaSendThread myThrd;
//WW 2023-08-22 增加数据库线程和WebSokcet线程
WebSocketThread socketThrd; //Web Socket线程类对象
WebhttpThread webhttpThrd; //Web http线程类对象 lnk202411
httpThread httpThrd; //Web http线程类对象 lnk202411
mqconsumerThread mqconsumerThrd;//mq消费者线程lnk20241213
OnTimerThread onTimerThrd;//定时线程
extern int FILE_FLAG;
extern int SEND_FLAG;
extern char* BROKER_LIST;
extern char* TOPIC_STAT;
extern char* TOPIC_PST;
extern char* TOPIC_PLT;
extern char* TOPIC_EVENT;
extern char* TOPIC_ALARM;
extern char* TOPIC_SNG;
extern char* TOPIC_RTDATA;//lnk20241220
extern char* UDS_UPLOAD_URL;
extern char g_onlyIP[255]; //直连某个IP仅仅为方便测试
//WW 2023-08-22 end
//lnk20241216添加mq消费者
extern std::string G_MQCONSUMER_IPPORT;//rocketmq ip+port
extern std::string G_MQCONSUMER_TOPIC_RT;//topie_realtimedata
extern std::string G_MQCONSUMER_TAG_RT;//tag
extern std::string G_MQCONSUMER_KEY_RT;//key
extern std::string G_MQCONSUMER_TOPIC_UD;//topie_update
extern std::string G_MQCONSUMER_TAG_UD;//tag
extern std::string G_MQCONSUMER_KEY_UD;//key
extern std::string G_MQCONSUMER_TOPIC_RC;//topie_recall
extern std::string G_MQCONSUMER_TAG_RC;//tag
extern std::string G_MQCONSUMER_KEY_RC;//key
extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall
extern std::string G_MQCONSUMER_TAG_SET;//tag
extern std::string G_MQCONSUMER_KEY_SET;//key
extern std::string G_MQCONSUMER_TOPIC_LOG;//topie_log
extern std::string G_MQCONSUMER_TAG_LOG;//tag
extern std::string G_MQCONSUMER_KEY_LOG;//key
extern std::string G_LOG_TOPIC;//topie
extern std::string G_LOG_TAG;//tag
extern std::string G_LOG_KEY;//key
bool showinshellflag =false;
#define APRTIME_8H (28800000000ULL)
#define APRTIME_1H (3600000000ULL)
///////////////////////////////////////////////////////////////////////////////
const int MAX_LIST_SIZE = 16;
//static QMap<int, QList<long long> > real_data_report_map;
static QMap<int, QMap<int, QList<long long>>> real_data_report_map; //多个监测点的多个实时报告的时间列表lnk20250624
static QMap<QString, json_block_data*> json_data_map;//CZY 2023-08-17 ww 2023年3月13日17:23:17扩展Map用于保存各条线路的数据
static QMap<QString, json_block_data*> json_flicker_data_map;//CZY 2023-09-11 展Map用于保存各条线路的闪变数据
static QMap<QString, json_block_data*> json_pst_data_map;//CZY 2023-09-11 展Map用于保存各条线路的闪变数据
bool is_blank(const std::string& str)
{
for (std::string::const_iterator it = str.begin(); it != str.end(); ++it)
{
if (!std::isspace(*it)) {
return false;
}
}
return true;
}
///////////////////////////////////////////////////////////////////////////////////
int urcbRealDataHasReceived(int dev_index, int rptNo, LD_info_t* LD_info, long long Time) //增加报告入参lnk20250624
{
QList<long long>& ts_list = real_data_report_map[LD_info->line_id][rptNo];
bool bFind = ts_list.contains(Time); //实时数据时间链表
if (bFind == false) {
ts_list.append(Time);
if (ts_list.size() > MAX_LIST_SIZE)
ts_list.removeFirst();
//lnk20241223每收到一次实时数据就检查一下数量
int real_report_count = 0;
//real_report_count = get_real_report_count(LD_info);
real_report_count = LD_info->rptinfo[rptNo]->count;//lnk20250624
//调试
std::cout << "real_report_count is" << real_report_count << std::endl;
std::cout << "mp limit is" << LD_info->limit << std::endl;
if(real_report_count >= LD_info->limit){
std::cout << "real_report_count reach limit!!!"<< std::endl;
//生成delete.xml
if (!createXmlFile(dev_index, LD_info->line_id, 0, 0, 0,"delete")) {
std::cerr << "Failed to create delete XML file!!!." << std::endl;
}
}
return 0; //没有重复数据
}
else
return 1; //有重复数据
}
//////////////////////////////////////////////////////////////////////////
void add_comm_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
if (g_node_id == STAT_DATA_BASE_NODE_ID)
com_log_fn += "comm_100_stat.txt";
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID)
com_log_fn += "comm_200_3s.txt";
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID)
com_log_fn += "comm_300_comtrade.txt";
else if (g_node_id == HIS_DATA_BASE_NODE_ID)
com_log_fn += "comm_400_his.txt";
else if (g_node_id == NEW_HIS_DATA_BASE_NODE_ID) {
com_log_fn.append(QString("comm_400_his_%1.txt").arg(g_front_seg_index));
}
else if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID)
com_log_fn += "comm_600_recall.txt";
else if (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) {
com_log_fn.append(QString("comm_700_allrecall_%1.txt").arg(g_front_seg_index));
}
else
com_log_fn += "comm_x00_unknown.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
void add_sng_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
com_log_fn += "sng_kafka_json.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
void add_stat_kafka_json_log(char* log_str)
{
QDateTime now = QDateTime::currentDateTime();
QString level_str = QString("[info]");
QString head_str = QString("");
QString tail_str = QString("");
#ifdef __GNUC__
QString com_log_fn("/usr/local/saslog/");
#else
QString com_log_fn("../etc/log/");
#endif
com_log_fn += "stat_kafka_json.txt";
QFile file(com_log_fn);
if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append))
return;
QTextStream out(&file);
out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl;
}
//////////////////////////////////////////////////////////////////////////
/*新增rocketmq发送数据lnk10-10*/
void my_rocketmq_send(Ckafka_data_t& data)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Rt_tp;
static bool init = false;
if (!init) {
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Rt_tp = TOPIC_RTDATA;
init = true;
}
std::string key = data.mp_id.toStdString();
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else if (data.strTopic == "RTDATA")//lnk20241220
{
topic = cfg_Rt_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
//单例模式
add_sng_log(data.strText.toAscii().data());
}
//rocketmq_producer_send(const_cast<char*>(senddata.c_str()),const_cast<char*>(topic.c_str()));
rocketmq_producer_send(senddata.c_str(), topic.c_str());//lnk20250623修复偶发性doublefree
}
void my_kafka_send(Ckafka_data_t& data)
{
#ifdef __GNUC__
static FeKafkaProducer kafkaProducer;
#endif
int retsize = -1;
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static std::string cfg_Sng_tp;
static bool init = false;
if (!init) {
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
cfg_Sng_tp = TOPIC_SNG;
cout << cfg_His_tp << endl;
std::string brokerlist = BROKER_LIST;
#ifdef __GNUC__
if (kafkaProducer.init(brokerlist)) {
printf("kafka producer init success(%s)\n", brokerlist.c_str());
/*bool ret = kafkaProducer.create_topic(topic);
if(ret)
printf("create topic OK \n");
else
printf("create topic Failed \n");*/
}
else
printf("kafka producer init Failed(%s)\n", brokerlist.c_str());
#endif
init = true;
}
char tmp_str[256];
apr_snprintf(tmp_str, sizeof(tmp_str), "%d", data.monitor_id);
std::string key = std::string(tmp_str);
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
add_stat_kafka_json_log(data.strText.toAscii().data());
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
add_sng_log(data.strText.toAscii().data());
}
#ifdef __GNUC__
retsize = kafkaProducer.send(senddata.c_str(), senddata.length(), topic, RdKafka::Topic::PARTITION_UA, &key);
#endif
if (retsize > 0) {
printf("\nkafka send, monitor_id:[%s] topic:[%s] Success,return length %d\n", key.c_str(), topic.c_str(), retsize);
}
else
printf("\nFailed kafka send, monitor_id:[%s] topic:[%s]\n", key.c_str(), topic.c_str());
}
void my_datahub_send(Ckafka_data_t& data)
{
static std::string topic;
static std::string cfg_His_tp;
static std::string cfg_PLT_tp;
static std::string cfg_PST_tp;
static std::string cfg_Evt_tp;
static std::string cfg_Alm_tp;
static bool init = false;
if (!init) {
cfg_His_tp = TOPIC_STAT;
cfg_PLT_tp = TOPIC_PLT;
cfg_PST_tp = TOPIC_PST;
cfg_Evt_tp = TOPIC_EVENT;
cfg_Alm_tp = TOPIC_ALARM;
init = true;
}
std::string key = data.mp_id.toStdString();
std::string senddata = data.strText.toStdString();
if (data.strTopic == "HISDATA")
{
topic = cfg_His_tp;
}
else if (data.strTopic == "PLT")
{
topic = cfg_PLT_tp;
}
else if (data.strTopic == "PST")
{
topic = cfg_PST_tp;
}
else if (data.strTopic == "Event")
{
topic = cfg_Evt_tp;
}
else if (data.strTopic == "Alm")
{
topic = cfg_Alm_tp;
}
else
{
topic = data.strTopic.toStdString();
}
if (g_onlyIP[0] != 0)
{
//单例模式
add_sng_log(data.strText.toAscii().data());
}
DataHub_Send_Datahub(const_cast<char*>(topic.c_str()), const_cast<char*>(senddata.c_str()));
printf("\ndatahub send, monitor_id:[%s] topic:[%s] Success\n", key.c_str(), topic.c_str());
}
void concatenate_and_separate(char str1[], char str2[], QString* result) {
QString qstr1 = QString(str1);
QString qstr2 = QString(str2);
*result = qstr1 + "-" + qstr2;
}
void KafkaSendThread::run()
{
//线程开始创建生产者lnk20241211
InitializeProducer();
printf("\nKafkaSendThread::run() is called ...... \n\n");
while (1) {
Ckafka_data_t data;
bool data_gotten;
data_gotten = false;
kafka_data_list_mutex.lock();
if (!kafka_data_list.isEmpty()) {
data_gotten = true;
data = kafka_data_list.takeFirst();
}
kafka_data_list_mutex.unlock();
if (data_gotten) {
static uint32_t count = 0;
printf("BEGIN my_kafka_send no.%i -------->>>>>>>>>>>> %s \n", count,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
if (SEND_FLAG == 1) //kafka推送
{
my_kafka_send(data);
}
else if (SEND_FLAG == 2)//datahub推送
{
my_datahub_send(data);
}
else if (SEND_FLAG == 3)//rocketmq推送lnk10-11
{
my_rocketmq_send(data);
}
else //未配置 默认mq推送
{
my_rocketmq_send(data);
}
printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++,
QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data());
}
//清空计数器
g_mqproducer_blocked_times =0;
QThread::msleep(10); // 避免 CPU 空转lnk20250326
}
//线程结束摧毁生产者
ShutdownAndDestroyProducer();//lnk20241211
}
//lnk20241213补招部分///////////////////////////////////////////////////////////////////////////////////////////////
// 提取 'data' 数组并返回为新的 JSON 字符串 (返回 std::string)
std::string extractDataJson(const char* inputJson) {
// 解析输入 JSON 字符串
cJSON* root = cJSON_Parse(inputJson);
if (root == NULL) {
std::cerr << "Error parsing JSON" << std::endl;
return "";
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return "";
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return "";
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return "";
}
//添加guid
// 提取 "guid" 部分
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == NULL || guidstr->type != cJSON_String) {
std::cerr << "'guid' is missing or is not an array" << std::endl;
cJSON_Delete(root);
return "";
}
//guid回复
std::string guid = guidstr->valuestring;
send_reply_to_kafka(guid,"1","收到补招指令");
// 提取 "data" 部分
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data == NULL || data->type != cJSON_Array) {
std::cerr << "'data' is missing or is not an array" << std::endl;
cJSON_Delete(root);
return "";
}
// 创建新的 JSON 数组对象,只包含 "data" 部分
cJSON* newJson = cJSON_CreateArray(); // 创建一个新的数组
// 将 "data" 数组中的元素逐个添加到新数组中
cJSON* dataItem = NULL;
cJSON_ArrayForEach(dataItem, data) {
cJSON_AddItemToArray(newJson, cJSON_Duplicate(dataItem, 1));
}
// 将新的 JSON 数组转换为字符串
char* newJsonString = cJSON_Print(newJson);
if (newJsonString == NULL) {
std::cerr << "Error printing new JSON" << std::endl;
cJSON_Delete(root);
cJSON_Delete(newJson);
return "";
}
// 转换为 std::string 类型
std::string result(newJsonString);
// 清理内存
free(newJsonString);
cJSON_Delete(root);
cJSON_Delete(newJson);
return result; // 返回 std::string 类型的结果
}
//实时数据部分//////////////////////////////////////////////////////////////////////////////////////////////////////////
// 提取 JSON 消息中的相关字段
bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::string& line, bool& realData, bool& soeData, int& limit)
{
// 解析 JSON 数据
cJSON* root = cJSON_Parse(body.c_str());
if (root == NULL) {
std::cerr << "Failed to parse JSON message." << std::endl;
return false;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return false;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return false;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return false;
}
// 提取字段
cJSON* devSeriesItem = cJSON_GetObjectItem(messageBody, "devSeries");
cJSON* lineItem = cJSON_GetObjectItem(messageBody, "line");
cJSON* realDataItem = cJSON_GetObjectItem(messageBody, "realData");
cJSON* soeDataItem = cJSON_GetObjectItem(messageBody, "soeData");
cJSON* limitItem = cJSON_GetObjectItem(messageBody, "limit");
//添加guid
std::string guid;
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if(guidstr)guid = guidstr->valuestring;
if (devSeriesItem && lineItem && realDataItem && soeDataItem && limitItem) {
devSeries = devSeriesItem->valuestring;
line = lineItem->valuestring;
realData = realDataItem->valueint;
soeData = soeDataItem->valueint;
limit = limitItem->valueint;
//回复消息
//执行结果直接看实时数据不需要再回复1是收到消息
send_reply_to_kafka(guid,"1","收到三秒数据指令");
} else {
std::cerr << "Missing expected fields in JSON message." << std::endl;
cJSON_Delete(root);
return false;
}
cJSON_Delete(root); // 清理 JSON 对象
return true;
}
// 构造 XML 内容的函数新建和删除
std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <New>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"" << (realData ? "true" : "false") << "\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"" << limit << "\" "
<< "Count=\"0\" "
<< "SOEData=\"" << (soeData ? "true" : "false") << "\"/>\n"
<< " </New>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
std::string createdeleteXmlContent(int devindex, int mpindex)
{
std::ostringstream xmlContent;
xmlContent << "<?xml version=\"1.0\" encoding=\"gb2312\"?>\n"
<< "<Trigger3S>\n"
<< " <Delete>\n"
<< " <Trigger Line=\"" << mpindex << "\" "
<< "RealData=\"false\" "
<< "DevSeries=\"" << devindex << "\" "
<< "Limit=\"0\" "
<< "Count=\"0\" "
<< "SOEData=\"false\"/>\n"
<< " </Delete>\n"
<< "</Trigger3S>\n";
return xmlContent.str();
}
// 写入 XML 内容到文件的函数
bool writeToFile(const std::string& filePath, const std::string& xmlContent)
{
// 打开文件流以写入 XML 内容
std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char*
if (outFile.is_open()) {
outFile << xmlContent; // 写入内容
outFile.close();
std::cout << "XML file created at: " << filePath << std::endl;
return true;
} else {
std::cerr << "Failed to open file for writing: " << filePath << std::endl;
return false;
}
}
// 创建并写入新的 XML 文件的主函数
bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type)
{
std::string xmlContent = "";
std::string directory = "";
std::string filePath = "";
if(type == "new"){
// 构造 XML 内容
xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit);
// 设置文件路径
directory = "../etc/trigger3s/";
filePath = directory + "newtrigger.xml";
}
else if(type == "delete"){
// 构造 XML 内容
xmlContent = createdeleteXmlContent(devindex, mpindex);
// 设置文件路径
directory = "../etc/trigger3s/";
filePath = directory + "deletetrigger.xml";
}
else{
std::cerr << "Failed to create xmlfile,type error: " << std::endl;
return false;
}
// 创建目录(如果不存在)
if (system(("mkdir -p " + directory).c_str()) != 0) {
std::cerr << "Failed to create directory: " << directory << std::endl;
return false;
}
// 将 XML 内容写入文件
return writeToFile(filePath, xmlContent);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//lnk20250108进程更新部分
// 用于关闭进程监听的端口
extern int server_socket; //Web Socket服务端实例
void close_listening_socket() {
if (server_socket != -1) {
// 关闭socket
close(server_socket);
std::cout << "Server socket closed successfully!" << std::endl;
server_socket = -1; // 重置 server_socket
} else {
std::cout << "No server socket to close!" << std::endl;
}
}
//用于校验ip格式
bool isValidIP(const std::string &ip) {
std::vector<std::string> parts;
std::stringstream ss(ip);
std::string part;
// 使用 "." 作为分隔符将 IP 地址分割成各部分
while (getline(ss, part, '.')) {
parts.push_back(part);
}
// IP 地址必须有 4 部分
if (parts.size() != 4) {
return false;
}
// 校验每一部分是否为合法的数字且在 0 到 255 之间
for (size_t i = 0; i < parts.size(); ++i) {
// 校验每部分是否为数字
for (size_t j = 0; j < parts[i].size(); ++j) {
if (!isdigit(parts[i][j])) {
return false;
}
}
// 转换为整数并检查是否在有效范围内
int num = atoi(parts[i].c_str());
if (num < 0 || num > 255) {
return false;
}
// 检查是否有前导零(如 01、001 等)
if (parts[i].length() > 1 && parts[i][0] == '0') {
return false;
}
}
return true;
}
//执行脚本控制进程
void execute_bash(string fun,int process_num,string type)
{
// 为 char 数组分配足够的空间
char p_num_str[20];
// 使用 sprintf 转换
std::sprintf(p_num_str, "%d", process_num);
const char* script = "/FeProject/bin/set_process.sh";//使用setsid防止端口占用
const char* param1 = fun.c_str();
const char* param2 = p_num_str;
const char* param3 = type.c_str();
// 构造完整的命令
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3);
std::cout << "command:" << command <<std::endl;
// 执行命令
system(command);
}
//执行脚本控制进程
void execute_bash_debug(string fun,string ip,string type,int proindex)
{
const char* script = "/FeProject/bin/set_debug.sh";//使用setsid防止端口占用
const char* param1 = fun.c_str();
const char* param2 = ip.c_str();
const char* param3 = type.c_str();
// 将 proindex 转换为字符串
char param4[32];
snprintf(param4, sizeof(param4), "%d", proindex);
// 构造完整的命令
char command[256];
snprintf(command, sizeof(command), "%s %s %s %s %s &", script, param1, param2, param3,param4);
std::cout << "command:" << command <<std::endl;
// 执行命令
system(command);
}
int parse_set(const std::string& json_str) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return 1;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return 1;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return 1;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 获取 guid 字段
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 根据 guid 字段回复消息
std::string guid = guidstr->valuestring;
// 获取 code 字段
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
cJSON* processNo = cJSON_GetObjectItem(messageBody, "processNo");
if (processNo == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
//判断是不是自己进程号:
int index_value = processNo->valueint;
cJSON* funtion = cJSON_GetObjectItem(messageBody, "fun");
if (funtion == nullptr) {
std::cout << "Missing 'fun' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string fun = funtion->valuestring;
cJSON* front = cJSON_GetObjectItem(messageBody, "frontType");
if (front == nullptr) {
std::cout << "Missing 'frontType' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string frontType = front->valuestring;
if (index_value != g_front_seg_index && g_front_seg_index != 0) {
std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return 0;
}
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl;
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理topic:%s_%s的进程控制消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_SET.c_str());
if (code_str == "set_process") {
cJSON* num = cJSON_GetObjectItem(messageBody, "processNum");
if (num == nullptr) {
std::cout << "Missing 'processNum' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
int processNum = num->valueint;
//校验数据
if((fun == "reset" || fun == "add") &&
(processNum >=1 && processNum < 10) &&
(frontType == "stat" || frontType == "recall" || frontType == "all")){
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
// 调用执行脚本函数
if(fun == "reset"){
close_listening_socket();
}
execute_bash(fun, processNum, frontType);
DIY_WARNLOG_CODE("process",LOG_CODE_PROCESS_CONTROL,"【WARN】前置的%s%d号进程执行指令:%s,reset表示重启所有进程,add表示添加进程",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
//脚本在3秒后执行
//回复消息
send_reply_to_kafka(guid,"1","收到重置进程指令,重启所有进程!");
//上送日志
std::cout << "this msg should only execute once" <<std::endl;
}
else{
std::cout << "only cfg_stat_data index 1 can control process,this process not handle this msg" << std::endl;
}
}
else if(fun == "delete"){
//等待一会后退出进程
MVL_LOG_ACSE0("MYLOG: recive delete msg, so exit to restart ");
//回复消息
send_reply_to_kafka(guid,"1","收到删除进程指令,这个进程将会重启 ");
//上送日志
DIY_WARNLOG_CODE("process",LOG_CODE_PROCESS_CONTROL,"【WARN】前置的%s%d号进程执行指令:%s,即将重启",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
apr_sleep(apr_time_from_sec(10));
::_exit(-1039); //进程退出
}
else{
std::cout << "param is not executable" <<std::endl;
}
}
else if (code_str == "set_debug"){
if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){
std::cout << "cfg_stat_data process" << g_front_seg_index <<" handle this msg" << std::endl;
cJSON* onlyip = cJSON_GetObjectItem(messageBody, "ip");
if (onlyip == nullptr) {
std::cout << "Missing 'ip' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string ip = onlyip->valuestring;
cJSON* index_item = cJSON_GetObjectItem(messageBody, "proindex");
if (index_item == nullptr) {
std::cout << "Missing 'proindex' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
int proindex = index_item->valueint;
std::cout << "proindex is :" << proindex <<std::endl;
//校验数据
if((fun == "start" || fun == "delete") &&
isValidIP(ip) &&
(frontType == "stat" || frontType == "recall" || frontType == "realTime" || frontType == "comtrade") &&
(proindex >= 10 && proindex < 100)){ //单连测试用的进程号应该大于10小于100
execute_bash_debug(fun, ip, frontType,proindex);
DIY_WARNLOG("process","【WARN】前置的%s%d号进程执行指令:%s,start开启单连进程,delete杀死单连进程",get_front_msg_from_subdir(), g_front_seg_index,fun.c_str());
}
else{
std::cout << "param is not executable" <<std::endl;
}
std::cout << "this msg should only execute once" <<std::endl;
}
else{
std::cout << "only cfg_stat_data index 1 can control process,this process not handle this msg" << std::endl;
}
}
else{
std::cout << "set process code str error" <<std::endl;
}
// 释放 JSON 对象
cJSON_Delete(root);
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
//lnk20250103台账更新部分
// 准备更新内容并生成 XML 字符串
// 添加缩进的函数
void add_indent(std::stringstream& stream, int level) {
for (int i = 0; i < level; ++i) {
stream << " "; // 每一级缩进 2 个空格
}
}
std::string prepare_update(const std::string& code_str, const terminal& json_data,const std::string& guid) //添加guid
{
std::cout << "prepare update" << std::endl;
std::stringstream xmlStream;
int indentLevel = 0; // 缩进级别
// 根节点
add_indent(xmlStream, indentLevel);
xmlStream << "<ledger_update>" << std::endl;
indentLevel++;
// 添加 guid 节点
add_indent(xmlStream, indentLevel);
xmlStream << "<guid>" << guid << "</guid>" << std::endl;
if (code_str == "ledger_modify" || code_str == "add_terminal") {
// 如果是 modify 类型
if (code_str == "ledger_modify") {
add_indent(xmlStream, indentLevel);
xmlStream << "<modify>" << std::endl;
indentLevel++;
}
else {
add_indent(xmlStream, indentLevel);
xmlStream << "<add>" << std::endl;
indentLevel++;
}
// 添加数据部分
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ip>" << json_data.addr_str << "</ip>" << std::endl; // Assuming `addr_str` for IP
add_indent(xmlStream, indentLevel);
xmlStream << "<devType>" << json_data.dev_type << "</devType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<maintName>" << json_data.maint_name << "</maintName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<orgName>" << json_data.org_name << "</orgName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<port>" << json_data.port << "</port>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<stationName>" << json_data.station_name << "</stationName>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalCode>" << json_data.terminal_code << "</terminalCode>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<updateTime>" << json_data.timestamp << "</updateTime>" << std::endl; // Assuming `timestamp`
add_indent(xmlStream, indentLevel);
xmlStream << "<manufacturer>" << json_data.tmnl_factory << "</manufacturer>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << json_data.tmnl_status << "</status>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<series>" << json_data.dev_series << "</series>" << std::endl;
//lnk20250210
add_indent(xmlStream, indentLevel);
xmlStream << "<processNo>" << json_data.processNo << "</processNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<devKey>" << json_data.dev_key << "</devKey>" << std::endl;
// monitorData 部分
for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) {
const monitor& monitor = json_data.line[i];
add_indent(xmlStream, indentLevel);
xmlStream << "<monitorData" << (i + 1) << ">" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << monitor.monitor_id << "</id>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<name>" << monitor.monitor_name << "</name>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<lineNo>" << monitor.logical_device_seq << "</lineNo>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<voltageLevel>" << monitor.voltage_level << "</voltageLevel>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<ptType>" << monitor.terminal_connect << "</ptType>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<timestamp>" << monitor.timestamp << "</timestamp>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminal_code>" << monitor.terminal_code << "</terminal_code>" << std::endl;
add_indent(xmlStream, indentLevel);
xmlStream << "<status>" << monitor.status << "</status>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</monitorData>" << std::endl;
}
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
// 结束 modify 或 add 标签
if (code_str == "ledger_modify") {
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</modify>" << std::endl;
}
else {
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</add>" << std::endl;
}
} else if (code_str == "delete_terminal") {
// 如果是 delete 类型
add_indent(xmlStream, indentLevel);
xmlStream << "<delete>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<terminalData>" << std::endl;
indentLevel++;
add_indent(xmlStream, indentLevel);
xmlStream << "<id>" << json_data.terminal_id << "</id>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</terminalData>" << std::endl;
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</delete>" << std::endl;
}
else {
std::cerr << "code_str error" << std::endl;
return "";
}
// 结束根节点
indentLevel--;
add_indent(xmlStream, indentLevel);
xmlStream << "</ledger_update>" << std::endl;
return xmlStream.str(); // 返回构造的 XML 字符串
}
// 函数将string字符串转换为整数
int StringToInt(const std::string& str) {
std::stringstream ss(str);
int number;
ss >> number; // 从字符串流中读取整数
// 检查是否转换成功
if (ss.fail()) {
std::cerr << "Conversion failed!" << std::endl;
return 0; // 或者你可以选择返回一个标识失败的值,如-1
}
return number;
}
// 解析 JSON 字符串并执行相应操作
int parse_log(const std::string& json_str) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return 1;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return 1;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return 1;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 获取 guid 字段
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 根据 guid 字段回复消息
std::string guid = guidstr->valuestring;
// 获取 code 字段
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
//获取进程号
cJSON* process = cJSON_GetObjectItem(messageBody, "processNo");
if (process == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
//判断是不是自己进程号:
int processNo = process->valueint;
// 获取 id 字段
cJSON* idstr = cJSON_GetObjectItem(messageBody, "id");
if (idstr == nullptr) {
std::cout << "Missing 'id' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string id = idstr->valuestring;
// 获取 level 字段
cJSON* levelstr = cJSON_GetObjectItem(messageBody, "level");
if (levelstr == nullptr) {
std::cout << "Missing 'level' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string level = levelstr->valuestring;
// 获取 grade 字段
cJSON* gradestr = cJSON_GetObjectItem(messageBody, "grade");
if (gradestr == nullptr) {
std::cout << "Missing 'grade' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string grade = gradestr->valuestring;
// 获取 logtype 字段
cJSON* logtypestr = cJSON_GetObjectItem(messageBody, "logtype");
if (logtypestr == nullptr) {
std::cout << "Missing 'logtype' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string logtype = logtypestr->valuestring;
// 获取 frontType 字段
cJSON* frontTypestr = cJSON_GetObjectItem(messageBody, "frontType");
if (frontTypestr == nullptr) {
std::cout << "Missing 'frontType' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
std::string frontType = frontTypestr->valuestring;
if (processNo != g_front_seg_index) {
std::cout << "msg index:"<< processNo <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return 0;
}
if (frontType != subdir) {
std::cout << "msg frontType:"<< frontType <<"doesnt match self frontType:" << subdir << std::endl;
cJSON_Delete(root);
return 0;
}
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理日志上送消息",get_front_msg_from_subdir(), g_front_seg_index);
//进程号和匹配上
std::cout << "msg index:"<< processNo <<" self index:" << g_front_seg_index << std::endl;
std::cout << "msg frontType:"<< frontType <<" self frontType:" << subdir << std::endl;
//回复消息
send_reply_to_kafka(guid,"1","收到实时日志指令");
if (code_str == "set_log") {
//校验数据
if((level == "terminal" || level == "measurepoint") &&
(grade == "NORMAL" || grade == "DEBUG") &&
(logtype == "com" || logtype == "data") &&
(!id.empty() && !is_blank(id))){
//开启开关
process_log_command(id, level, grade, logtype);
}
else{
std::cout << "type doesnt match" <<std::endl;
//记录warm
}
std::cout << "this msg should only execute once" <<std::endl;
}
// 释放 JSON 对象
cJSON_Delete(root);
return 0;
}
// 台账更新不区分功能
int parse_control(const std::string& json_str, const std::string& output_dir) {
// 解析 JSON 字符串
cJSON* root = cJSON_Parse(json_str.c_str());
if (root == nullptr) {
std::cout << "Error parsing JSON." << std::endl;
return 1;
}
// 提取 "messageBody" 部分
cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody");
if (messageJson == NULL || messageJson->type != cJSON_String) {
std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl;
cJSON_Delete(root);
return 1;
}
// 解析 messageBody 中的 JSON 字符串
const char* messageBodyStr = messageJson->valuestring;
if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) {
std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl;
cJSON_Delete(root);
return 1;
}
cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串
if (messageBody == NULL) {
std::cerr << "Failed to parse 'messageBody' JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 获取 code 字段
cJSON* code = cJSON_GetObjectItem(messageBody, "code");
if (code == nullptr) {
std::cout << "Missing 'code' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 根据 code 字段值执行不同的解析逻辑
std::string code_str = code->valuestring;
//获取进程号
cJSON* process = cJSON_GetObjectItem(messageBody, "processNo");
if (process == nullptr) {
std::cout << "Missing 'processNo' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
//判断是不是自己进程号:
int process_No = process->valueint;
// 获取 guid 字段
cJSON* guidstr = cJSON_GetObjectItem(messageBody, "guid");
if (guidstr == nullptr) {
std::cout << "Missing 'guid' in JSON." << std::endl;
cJSON_Delete(root);
return 1;
}
// 根据 guid 字段回复消息
std::string guid = guidstr->valuestring;
//进程号为0的进程处理所有台账更新消息
if (process_No != g_front_seg_index && g_front_seg_index !=0) {
std::cout << "msg index:"<< process_No <<"doesnt match self index:" << g_front_seg_index << std::endl;
cJSON_Delete(root);
return 0;
}
//进程号为0或者进程号匹配上
std::cout << "msg index:"<< process_No <<" self index:" << g_front_seg_index << std::endl;
//记录日志
DIY_INFOLOG("process","【NORMAL】前置的%s%d号进程处理topic:%s_%s的台账更新消息",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_UD.c_str());
//匹配后响应收到台账更新消息
//除了回复收到消息,执行结束后还要回复结果
send_reply_to_kafka(guid,"1","收到台账更新指令");
if (code_str == "add_terminal" || code_str == "ledger_modify") {
std::cout << "add or update ledger" << std::endl;
// 解析 add_terminal 或 ledger_modify
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
terminal json_data;
// 填充 terminal_dev 的数据
cJSON* id = cJSON_GetObjectItem(item, "id"); // terminal_id
if (id && id->type == cJSON_String)
std::strncpy(json_data.terminal_id, id->valuestring, sizeof(json_data.terminal_id) - 1);
else
std::strncpy(json_data.terminal_id, "N/A", sizeof(json_data.terminal_id) - 1);
cJSON* name = cJSON_GetObjectItem(item, "name"); // terminal_code
if (name && name->type == cJSON_String)
std::strncpy(json_data.terminal_code, name->valuestring, sizeof(json_data.terminal_code) - 1);
else
std::strncpy(json_data.terminal_code, "N/A", sizeof(json_data.terminal_code) - 1);
cJSON* org_name = cJSON_GetObjectItem(item, "org_name"); // org_name
if (org_name && org_name->type == cJSON_String)
std::strncpy(json_data.org_name, org_name->valuestring, sizeof(json_data.org_name) - 1);
else
std::strncpy(json_data.org_name, "N/A", sizeof(json_data.org_name) - 1);
cJSON* maint_name = cJSON_GetObjectItem(item, "maint_name"); // maint_name
if (maint_name && maint_name->type == cJSON_String)
std::strncpy(json_data.maint_name, maint_name->valuestring, sizeof(json_data.maint_name) - 1);
else
std::strncpy(json_data.maint_name, "N/A", sizeof(json_data.maint_name) - 1);
cJSON* station_name = cJSON_GetObjectItem(item, "stationName"); // station_name
if (station_name && station_name->type == cJSON_String)
std::strncpy(json_data.station_name, station_name->valuestring, sizeof(json_data.station_name) - 1);
else
std::strncpy(json_data.station_name, "N/A", sizeof(json_data.station_name) - 1);
cJSON* manufacturer = cJSON_GetObjectItem(item, "manufacturer"); // tmnl_factory
if (manufacturer && manufacturer->type == cJSON_String)
std::strncpy(json_data.tmnl_factory, manufacturer->valuestring, sizeof(json_data.tmnl_factory) - 1);
else
std::strncpy(json_data.tmnl_factory, "N/A", sizeof(json_data.tmnl_factory) - 1);
cJSON* status = cJSON_GetObjectItem(item, "status"); // tmnl_status
if (status && status->type == cJSON_String)
std::strncpy(json_data.tmnl_status, status->valuestring, sizeof(json_data.tmnl_status) - 1);
else
std::strncpy(json_data.tmnl_status, "N/A", sizeof(json_data.tmnl_status) - 1);
cJSON* dev_type = cJSON_GetObjectItem(item, "devType"); // dev_type
if (dev_type && dev_type->type == cJSON_String)
std::strncpy(json_data.dev_type, dev_type->valuestring, sizeof(json_data.dev_type) - 1);
else
std::strncpy(json_data.dev_type, "N/A", sizeof(json_data.dev_type) - 1);
cJSON* dev_key = cJSON_GetObjectItem(item, "devKey"); // dev_key
if (dev_key && dev_key->type == cJSON_String)
std::strncpy(json_data.dev_key, dev_key->valuestring, sizeof(json_data.dev_key) - 1);
else
std::strncpy(json_data.dev_key, "N/A", sizeof(json_data.dev_key) - 1);
cJSON* dev_series = cJSON_GetObjectItem(item, "series"); // dev_series
if (dev_series && dev_series->type == cJSON_String)
std::strncpy(json_data.dev_series, dev_series->valuestring, sizeof(json_data.dev_series) - 1);
else
std::strncpy(json_data.dev_series, "N/A", sizeof(json_data.dev_series) - 1);
//lnk20250210台账进程号
cJSON* processNo = cJSON_GetObjectItem(item, "processNo"); // processNo转为字符串
if (processNo && processNo->type == cJSON_Number) snprintf(json_data.processNo, sizeof(json_data.processNo), "%d", processNo->valueint);
else strncpy(json_data.processNo, "N/A", sizeof(json_data.processNo) - 1);
cJSON* ip = cJSON_GetObjectItem(item, "ip"); // addr_str
if (ip && ip->type == cJSON_String)
std::strncpy(json_data.addr_str, ip->valuestring, sizeof(json_data.addr_str) - 1);
else
std::strncpy(json_data.addr_str, "N/A", sizeof(json_data.addr_str) - 1);
cJSON* port = cJSON_GetObjectItem(item, "port"); // port
if (port && port->type == cJSON_String)
std::strncpy(json_data.port, port->valuestring, sizeof(json_data.port) - 1);
else
std::strncpy(json_data.port, "N/A", sizeof(json_data.port) - 1);
cJSON* updateTime = cJSON_GetObjectItem(item, "updateTime"); // timestamp
if (updateTime && updateTime->type == cJSON_String)
std::strncpy(json_data.timestamp, updateTime->valuestring, sizeof(json_data.timestamp) - 1);
else
std::strncpy(json_data.timestamp, "N/A", sizeof(json_data.timestamp) - 1);
// monitorData 解析,填充到 line 数组中
cJSON* monitorData = cJSON_GetObjectItem(item, "monitorData");
if (monitorData != nullptr && monitorData->type == cJSON_Array) {
int monitorData_size = cJSON_GetArraySize(monitorData);
for (int j = 0; j < monitorData_size && j < 10; j++) { // 最多 10 个监测点
cJSON* monitor_item = cJSON_GetArrayItem(monitorData, j);
monitor monitor_data;
cJSON* monitor_id = cJSON_GetObjectItem(monitor_item, "id"); // monitor_id
if (monitor_id && monitor_id->type == cJSON_String)
std::strncpy(monitor_data.monitor_id, monitor_id->valuestring, sizeof(monitor_data.monitor_id) - 1);
else
std::strncpy(monitor_data.monitor_id, "N/A", sizeof(monitor_data.monitor_id) - 1);
cJSON* monitor_name = cJSON_GetObjectItem(monitor_item, "name"); // monitor_name
if (monitor_name && monitor_name->type == cJSON_String)
std::strncpy(monitor_data.monitor_name, monitor_name->valuestring, sizeof(monitor_data.monitor_name) - 1);
else
std::strncpy(monitor_data.monitor_name, "N/A", sizeof(monitor_data.monitor_name) - 1);
cJSON* voltage_level = cJSON_GetObjectItem(monitor_item, "voltageLevel"); // voltage_level
if (voltage_level && voltage_level->type == cJSON_String)
std::strncpy(monitor_data.voltage_level, voltage_level->valuestring, sizeof(monitor_data.voltage_level) - 1);
else
std::strncpy(monitor_data.voltage_level, "N/A", sizeof(monitor_data.voltage_level) - 1);
cJSON* monitor_status = cJSON_GetObjectItem(monitor_item, "status"); // status
if (monitor_status && monitor_status->type == cJSON_String)
std::strncpy(monitor_data.status, monitor_status->valuestring, sizeof(monitor_data.status) - 1);
else
std::strncpy(monitor_data.status, "N/A", sizeof(monitor_data.status) - 1);
cJSON* lineNo = cJSON_GetObjectItem(monitor_item, "lineNo"); // logical_device_seq
if (lineNo && lineNo->type == cJSON_String)
std::strncpy(monitor_data.logical_device_seq, lineNo->valuestring, sizeof(monitor_data.logical_device_seq) - 1);
else
std::strncpy(monitor_data.logical_device_seq, "N/A", sizeof(monitor_data.logical_device_seq) - 1);
cJSON* ptType = cJSON_GetObjectItem(monitor_item, "ptType"); // terminal_connect
if (ptType && ptType->type == cJSON_String)
std::strncpy(monitor_data.terminal_connect, ptType->valuestring, sizeof(monitor_data.terminal_connect) - 1);
else
std::strncpy(monitor_data.terminal_connect, "N/A", sizeof(monitor_data.terminal_connect) - 1);
std::strncpy(monitor_data.timestamp, json_data.timestamp, sizeof(monitor_data.timestamp) - 1);
std::strncpy(monitor_data.terminal_code, json_data.terminal_code, sizeof(monitor_data.terminal_code) - 1);
// 填充到 line 数组
json_data.line[j] = monitor_data;
}
}
print_terminal(&json_data);
// 准备 XML 内容并写入文件
std::string xmlContent = prepare_update(code_str, json_data,guid);//添加guid20250506
if (xmlContent != "") {
std::cout << "write to xml in /FeProject/etc/ledger_update" <<std::endl;
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int
std::string nodeid_str(nodeid);
std::string frontindex_str = intToString(g_front_seg_index);
std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_" + code_str + ".xml";
writeToFile(file_name, xmlContent);
}
}
}
}
else if (code_str == "delete_terminal") {
std::cout << "delete ledger" <<std::endl;
// 解析 delete_terminal
cJSON* data = cJSON_GetObjectItem(messageBody, "data");
if (data != nullptr && data->type == cJSON_Array) {
int data_size = cJSON_GetArraySize(data);
for (int i = 0; i < data_size; i++) {
cJSON* item = cJSON_GetArrayItem(data, i);
// 只解析 id 字段
cJSON* id = cJSON_GetObjectItem(item, "id");
if (id != nullptr) {
terminal json_data;
std::strncpy(json_data.terminal_id, cJSON_GetObjectItem(item, "id")->valuestring, sizeof(json_data.terminal_id) - 1);
// 准备 XML 内容并写入文件
std::string xmlContent = prepare_update(code_str, json_data,guid);//添加guid20250506
if(xmlContent != ""){
char nodeid[20];
std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int
std::string nodeid_str(nodeid);
std::string frontindex_str = intToString(g_front_seg_index);
std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_delete_terminal.xml";
writeToFile(file_name, xmlContent); //写文件加上guid读文件
}
}
}
}
}
else{
std::cout << "code_str error" <<std::endl;
}
// 释放 JSON 对象
cJSON_Delete(root);
return 0;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
int find_dev_index_from_dev_id(std::string dev_id)
{
ied_t* ied = NULL;
int iedno;
ied_usr_t* ied_usr = NULL;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = (ied_usr_t*)ied->usr_ext;
if (ied_usr && strcmp(ied_usr->terminal_id, dev_id.c_str()) == 0) {
return ied_usr->dev_idx;
}
}
return 0;
}
int find_mp_index_from_mp_id(std::string line)
{
LD_info_t* LD_info = NULL;
LD_info = find_LD_info_only_from_mp_id((char*)line.c_str());
if(LD_info == NULL){
return 0;
}
else{
return LD_info->line_id;
}
}
int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg)
{
if(INITFLAG != 1)return 1;//防止崩溃
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//记录日志
DIY_INFOLOG("process","【NORMAL】前置消费topic:%s_%s的实时触发消息",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str());
// 处理消息(例如,打印消息内容)
std::cout << "rt data Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理消费数据
std::string devid, line;
bool realData, soeData;
int limit;
// 解析 JSON 数据
if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) {
std::cerr << "Failed to parse the JSON message." << std::endl;
//记录日志
DIY_ERRORLOG_CODE("process",LOG_CODE_RT_DATA,"【ERROR】前置消费topic:%s_%s的实时触发消息失败,消息的json格式不正确",FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RT.c_str());
return E_RECONSUME_LATER;
}
//mq处理实时数据指令查询台账时添加锁
pthread_mutex_lock(&mtx); std::cout << "rtdata hold lock !!!!!!!!!!!" << std::endl;
int dev_index = find_dev_index_from_dev_id(devid);
int mp_index = find_mp_index_from_mp_id(line);
pthread_mutex_unlock(&mtx); std::cout << "rtdata free lock !!!!!!!!!!!" << std::endl;
if(dev_index == 0 || mp_index == 0){
std::cerr << "dev index or mp index is 0" << std::endl;
return E_RECONSUME_LATER;
}
// 创建 XML 文件
if (!createXmlFile(dev_index, mp_index, realData, soeData, limit,"new")) {
DIY_ERRORLOG_CODE("process",LOG_CODE_RT_DATA,"【ERROR】前置无法创建实时数据触发文件");
std::cerr << "Failed to create the XML file." << std::endl;
return E_RECONSUME_LATER;
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg)
{
if(INITFLAG != 1)return 1;//防止崩溃
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//处理消费数据
std::cout << "ledger update Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理台账更新消息
std::string updatefilepath = "/home/pq/FeProject/etc/ledgerupdate";
if(parse_control(body,updatefilepath)){
DIY_ERRORLOG_CODE("process",LOG_CODE_LEDGER_UPDATE,"【ERROR】前置的%s%d号进程处理topic:%s_%s的台账更新消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_UD.c_str());
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg)
{
if(INITFLAG != 1)return 1;//防止崩溃
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//处理消费数据
std::cout << "process Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理进程更新消息
if(parse_set(body)){
DIY_ERRORLOG_CODE("process",LOG_CODE_PROCESS_CONTROL,"【ERROR】前置的%s%d号进程处理topic:%s_%s的进程控制消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_SET.c_str());
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg)
{
if(INITFLAG != 1)return 1;//防止崩溃
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
//处理消费数据
std::cout << "process Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理进程更新消息
if(parse_log(body)){
DIY_ERRORLOG_CODE("process",LOG_CODE_LOG_REQUEST,"【ERROR】前置的%s%d号进程处理topic:%s_%s的日志上送消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_LOG.c_str());
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg)
{
if(INITFLAG != 1)return 1;//防止崩溃
//调试
std::cout << "myMessageCallbackrecall"<< std::endl;
if (msg == NULL) {
std::cerr << "Received null message." << std::endl;
return E_RECONSUME_LATER;
}
const char* body = GetMessageBody(msg);
const char* key = GetMessageKeys(msg);
if (body == NULL) {
std::cerr << "Message body is NULL." << std::endl;
return E_RECONSUME_LATER;
}
else{
// 处理消息(例如,打印消息内容)
std::cout << "recall Callback received message: " << body << std::endl;
if (key) {
std::cout << "Message Key: " << key << std::endl;
}
else {
std::cout << "Message Key: N/A" << std::endl;
}
//处理消费数据
std::string result = extractDataJson(body); // 使用 std::string 代替 malloc
//调试
std::cout << "extractDataJson:"<< result.c_str() <<std::endl;
if (!result.empty()) {
pthread_mutex_lock(&mtx); std::cout << "recall mq hold lock !!!!!!!!!!!" << std::endl;
recall_json_handle(result.c_str()); // 使用 c_str() 获取 const char* 类型
pthread_mutex_unlock(&mtx); std::cout << "recall mq free lock !!!!!!!!!!!" << std::endl;
}
else{
std::cerr << "recall data is NULL." << std::endl;
DIY_ERRORLOG_CODE("process",LOG_CODE_RECALL,"【ERROR】前置的%s%d号进程处理topic:%s_%s的补招触发消息失败,消息的json结构不正确",get_front_msg_from_subdir(), g_front_seg_index,FRONT_INST.c_str(),G_MQCONSUMER_TOPIC_RC.c_str());
}
}
// 根据业务逻辑决定返回状态
return E_CONSUME_SUCCESS;
}
void mqconsumerThread::run()
{
// 配置消费者参数
std::string consumerName = subdir + intToString(g_front_seg_index) + "_start_" + QDateTime::currentDateTime().toString("yyyyMMddhhmmss").toStdString(); // 消费者组ID+启动时间,不消费历史消息
std::string nameServer = G_MQCONSUMER_IPPORT; // NameServer地址
// 定义多个主题、标签及其对应的回调函数
std::vector<Subscription> subscriptions;
// 初始化消费者1 //lnk20241230只有实时进程会订阅实时topic不订阅实时topic的进程无法触发实时数据
if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID){
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata));
}
// 初始化消费者2 //所有进程都会订阅台账更新topic不同功能进程的台账不能互相影响
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate));
// 初始化消费者3 //lnk20241230只有补招进程会订阅补招topic不订阅补招topic的进程无法触发补招数据
if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID){
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall));
}
// 初始化消费者4 //lnk20250108只有稳态进程1会控制reset
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset));
// 初始化消费者5 //所有进程都会订阅日志上送topic不同功能进程的日志上送不能互相影响
subscriptions.push_back(Subscription(std::string(FRONT_INST) + "_" + G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog));
try {
rocketmq_consumer_receive(consumerName, nameServer, subscriptions);
}
catch (const std::exception& e) {
std::cerr << "Exception during consumerUD setup: " << e.what() << std::endl;
}
// 程序运行中,消费者会通过回调处理消息
std::cout << "Consumer is running. " << std::endl;
}
//CZY 2023-08-23 get double class voltage level, if false will return 0;
double get_voltage_level(char voltage_level_char[]) {
try
{
int n = atoi(voltage_level_char);
switch (n)
{
case 1://交流6V
return 0.006;
case 2://交流12V
return 0.012;
case 3://交流24V
return 0.024;
case 4://交流36V
return 0.036;
case 5://交流48V
return 0.048;
case 6://交流110V
return 0.11;
case 7://交流220V
return 0.22;
case 8://交流380V含400V
return 0.38;
case 9://交流660V
return 0.66;
case 10://交流1000V含1140V
return 1;
case 11://交流600V
return 0.6;
case 12://交流750V
return 0.75;
case 13://交流1500V
return 1.5;
case 14://交流2000V
return 2.0;
case 15://交流2500V
return 2.5;
case 20://交流3kV
return 3;
case 21://交流6kV
return 6;
case 22://交流10kV
return 10;
case 23://交流15.75kV
return 15.75;
case 24://交流20kV
return 20;
case 25://交流35kV
return 35;
case 30://交流66kV
return 66;
case 31://交流72.5kV
return 72.5;
case 32://交流110kV
return 110;
case 33://交流220kV
return 220;
case 34://交流330kV
return 330;
case 35://交流500kV
return 500;
case 36://交流750kV
return 750;
case 37://交流1000kV
return 1000;
case 51://直流6V
return 0.006;
case 52://直流12V
return 0.012;
case 53://直流24V
return 0.024;
case 54://直流36V
return 0.036;
case 55://直流48V
return 0.048;
case 56://直流110V
return 0.11;
case 60://直流220V
return 0.22;
case 70://直流600V
return 0.6;
case 71://直流750V
return 0.75;
case 72://直流1500V
return 1.5;
case 73://直流3000V
return 3.0;
case 76://直流35kV
return 35;
case 77://直流30kV
return 30;
case 78://直流50kV
return 50;
case 80://直流120kV
return 120;
case 81://直流125kV
return 125;
case 82://直流400kV
return 400;
case 83://直流500kV
return 500;
case 84://直流660kV
return 660;
case 85://直流800kV
return 800;
case 86://直流1000kV
return 1000;
case 87://直流200kV
return 200;
case 88://直流320kV。
return 320;
default:
return 0;
break;
}
}
catch (const std::exception&)
{
//error
return 0;
}
}
void try_start_kafka_thread()
{
static int kafka_thread_created = 0;
if (!kafka_thread_created) {
myThrd.start();
kafka_thread_created = 1;
}
}
//lnk20241213
void try_start_mqconsumer_thread()
{
static int mqconsumer_thread_created = 0;
if (!mqconsumer_thread_created) {
mqconsumerThrd.start();
mqconsumer_thread_created = 1;
}
}
/////////////////////////////////////////////////////////////////////////
json_block_data json_blkd;
//CZY 2023-08-17 WW 2022年12月6日14:09:08 增加多个ICD支持
//json_block_data json_blkd; //json拼接参数类对象,原有的一个数据对象在多ICD下会出现数据错位问题
//解决方案是将此数据放入LDInfo结构中存储保证一条线路一个json拼接参数类对象
void init_json_block_data(char mp_id[], char voltage_level[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_flicker_data_map.insert(mp_id, pdata);
}
pdata = json_flicker_data_map.value(mp_id);
}
else if (flicker_flag == 0) {
if (!json_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_data_map.insert(mp_id, pdata);
}
pdata = json_data_map.value(mp_id);
}
else if (flicker_flag == 2) {
if (!json_pst_data_map.contains(mp_id))
{
pdata = new json_block_data();
json_pst_data_map.insert(mp_id, pdata);
}
pdata = json_pst_data_map.value(mp_id);
}
if (pdata == NULL)
return;
pdata->monitorId = -1;
QString tmp;
tmp.append(mp_id);
pdata->mp_id = tmp;
pdata->func_type = g_node_id;
//flag 是品质, 异常送1 正常送0
pdata->flag = 0; // //剔除标记1不剔除0剔除默认剔除
pdata->mms_str_map.clear();
pdata->voltage_level = get_voltage_level(voltage_level); //CZY 2023-08-23 add voltage_level
}
int json_block_create_start(char voltage_level[], char monid_char[], int flicker_flag, char temcode[], int line_id)//WW 2023年3月13日16:38 : 41 多ICD修改
{
try_start_kafka_thread();
init_json_block_data(monid_char, voltage_level, flicker_flag);
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
{
pdata->dev_type.append(temcode);
pdata->monitorId = line_id;
if (strlen(monid_char) != 0) {
QString tmp;
tmp.append(monid_char);
pdata->mp_id = tmp;
}
else {
monid_char = "not define";
QString tmp;
tmp.append(monid_char);
pdata->mp_id = tmp;
}
}
printf("\n\n---------- json_block_create_start: mp_id=%s,voltage_level=%s,line_id=%d \n", monid_char, voltage_level, line_id);
return TRUE;
}
int json_block_create_time(char monid_char[], long long Time, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
pdata->time = Time;
printf("\njson_block_create_time: mp_id=%s,Time=%lld \n", monid_char, Time);
return TRUE;
}
int json_block_create_flag(char monid_char[], int flag, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
if (pdata != NULL)
pdata->flag = flag;
printf("\njson_block_create_flag: mp_id=%s,flag=%d \n", monid_char, flag);
return TRUE;
}
int json_block_create_data(char monid_char[], char* mms_str, double v, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
return 0;
pdata = json_pst_data_map.value(monid_char);
}
static int count = 0;
if (pdata != NULL)
{
pdata->mms_str_map.insert(QString::fromAscii(mms_str), v);
if (strstr(mms_str, "MMXU2$MX$PhV"))
printf("---------- json_block_create_data: mp_id= %s ,mms_str=%s value=%fkV----------\n", monid_char, mms_str, v);
}
return TRUE;
}
//lnk2024-8-16添加接线参数
int json_block_create_end(char v_wiring_type[], char monid_char[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改
{
json_block_data* pdata;
if (flicker_flag == 1) {
if (!json_flicker_data_map.contains(monid_char))//未查到数据
{
printf("---------- json_block_create_end: mp_id= %s json_flicker_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_flicker_data_map.value(monid_char);
}
else if (flicker_flag == 0)
{
if (!json_data_map.contains(monid_char))//未查到数据
{
printf("---------- json_block_create_end: mp_id= %s json_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_data_map.value(monid_char);
}
else if (flicker_flag == 2)
{
if (!json_pst_data_map.contains(monid_char))//未查到数据
{
printf("---------- json_block_create_end: mp_id= %s json_pst_data_map can't find MonitorId----------\n", monid_char);
return 1;
}
pdata = json_pst_data_map.value(monid_char);
}
//调试用
/*if (pdata != nullptr) {
printf("monitorId: %d\n", pdata->monitorId);
printf("func_type: %d\n", pdata->func_type);
printf("flag: %d\n", pdata->flag);
printf("time: %lld\n", pdata->time);
printf("voltage_level: %f\n", pdata->voltage_level);
printf("mp_id: %s\n", pdata->mp_id.toStdString().c_str());
printf("dev_type: %s\n", pdata->dev_type.toStdString().c_str());
printf("mms_str_map count: %d\n", pdata->mms_str_map.size());
} else {
printf("pdata is NULL\n");
}*/
if (pdata->mms_str_map.count() == 0) {
if (flicker_flag == 1) {
json_flicker_data_map.remove(monid_char);
}
else if (flicker_flag == 0)
{
json_data_map.remove(monid_char);
}
else if (flicker_flag == 2)
{
json_pst_data_map.remove(monid_char);
}
printf("---------- json_block_create_end: pdata->mms_str_map.count() == 0 ----------\n");
return 1;
}
//lnk2024-8-16添加接线参数
int ret = transfer_json_block_data(v_wiring_type, pdata);
if (pdata != NULL)
delete pdata;
if (flicker_flag == 1) {
json_flicker_data_map.remove(monid_char);
}
else if (flicker_flag == 0)
{
json_data_map.remove(monid_char);
}
else if (flicker_flag == 2)
{
json_pst_data_map.remove(monid_char);
}
printf("---------- json_block_create_end: MonitorId= %s ----------\n", monid_char);
return ret;
}
//////////////////////////////////////////////////////////////////////////////
void clear_old_comtrade_files()
{
if (g_node_id != SOE_COMTRADE_BASE_NODE_ID)
return;
QString full_fn_str;
QString dir_name("../comtrade/");
QDir directory_comtrade(dir_name);
QStringList fileNames = directory_comtrade.entryList(QDir::Files | QDir::NoDotAndDotDot, QDir::Time);
if (fileNames.size() <= comtrade_remain_file_num)
return;
for (QStringList::size_type i = comtrade_remain_file_num; i != fileNames.size(); ++i) {
full_fn_str = dir_name + fileNames.at(i);
QFile::remove(full_fn_str);
}
}
/////////////////////////////////////////////
int process_login_verify()
{
int length = 64;
char password[64 + 1];
char* p = NULL;
int count = 0;
char encode_password[256];
const char* passwordConfirm = "1c0e4e104de596846648ba495bd32601";
memset(password, 0, sizeof(password));
printf("Please input password : \n");
p = password;
count = 0;
system("stty -echo");
std::cin.getline(password, 64);
system("stty echo");
password[length] = '\0';
MyGetSM4Code(password, (unsigned char*)"epri.sgcc.com.cn", encode_password);
return (strcmp(encode_password, passwordConfirm));
}
///////////////////////////////////////////
void try_start_socket_thread()
{
static int socket_thread_created = 0;
if (!socket_thread_created) {
socketThrd.start();
socket_thread_created = 1;
}
}
//lnk20241029
void try_start_web_http_thread()
{
static int webhttp_thread_created = 0;
if (!webhttp_thread_created) {
webhttpThrd.start();
webhttp_thread_created = 1;
}
}
void try_start_http_thread()
{
static int http_thread_created = 0;
if (!http_thread_created) {
httpThrd.start();
http_thread_created = 1;
}
}
//lnk20241202
int try_start_mqtest_thread(int argc, char *argv[])
{
//不使用简单的循环线程而是启动一个app不仅执行循环线程而且可以连接输入
//安装qt打印
qInstallMsgHandler(myQtMsgHandler);
QCoreApplication a(argc, argv);
// 创建 QThread 和 Worker 对象
QThread *thread = new QThread();
Worker *worker = new Worker();
// 将 Worker 对象移动到 QThread 中
worker->moveToThread(thread);
// 连接信号和槽
QObject::connect(thread, SIGNAL(started()), worker, SLOT(startServer()));
QObject::connect(worker, SIGNAL(serverError()), thread, SLOT(quit()));
QObject::connect(worker, SIGNAL(serverError()), worker, SLOT(deleteLater()));
QObject::connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
// 启动线程
thread->start();
// 确保在应用退出时,线程也能正确退出
QObject::connect(&a, SIGNAL(aboutToQuit()), thread, SLOT(quit()));
return a.exec();
}
void try_start_ontimer_thread()
{
static int ontimer_thread_created = 0;
if (!ontimer_thread_created) {
onTimerThrd.start();
ontimer_thread_created = 1;
}
}
//WW 2023-08-22 end
///////////////////////////////////////////
//ZW 2024-01-31 补招数据模式优化
static QMap<QString, int> mvl_type_ctrl_map;//ZW 2024-01-31 用于保存单次获取的模型
static int mvl_type_ctrl_map_size;//计数
//添加doname对应的数据模型
void add_mvl_type_ctrl(char doname[], int ctrl)
{
if (!mvl_type_ctrl_map.contains(doname))
{
mvl_type_ctrl_map.insert(doname, ctrl);
}
}
//删除map中所有数据模型
void del_mvl_type_ctrl()
{
for (QMap<QString, int>::iterator it = mvl_type_ctrl_map.begin(); it != mvl_type_ctrl_map.end(); ++it) {
QString key = it.key();
int value = it.value();
mvl_type_id_destroy(value);
}
mvl_type_ctrl_map.clear();
}
int get_mvl_type_ctrl_map_size()
{
return mvl_type_ctrl_map.count();
}
//查找对应doname的数据模型是否存在map中
int sel_mvl_type_ctrl_flag(char doname[])
{
if (mvl_type_ctrl_map.contains(doname))
{
return mvl_type_ctrl_map.value(doname);
}
else
{
return -1;
}
}
//ZW 2024-01-31 end