/** * @file: $RCSfile: save2json.cpp,v $ * @brief: $IEC 61850 Protocol * * @version: $Revision: 1.21 $ * @date: $Date: 2020/10/27 08:51:07 $ * @author: $Author: lizhongming $ * @state: $State: Exp $ * * @latest: $Id: save2json.cpp,v 1.21 2020/10/27 08:51:07 lizhongming Exp $ * */ using namespace std; #include #include #include #include // std::filebuf #include #include #include "qdebug.h" #include #include #include #include //CZY 2023-08-17 WW 2023年3月13日17:21:02 增加多ICD支持 #include #include #include "../mms/db_interface.h" #include "../json/save2json.h" #include "../json/mms_json_inter.h" #include "kafka_producer.h" /*lnk10-11 */ //#include "../include/rocketmq/SimpleProducer.h" #include "../include/rocketmq/CPushConsumer.h" #include #include "../json/cjson.h" //解json #include //创建xml #include //创建xml bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type); extern int recall_json_handle(const char* jstr); extern std::string intToString(int number); //extern int stringToInt(const char* str, int* result); int StringToInt(const std::string& str); extern pthread_mutex_t mtx;//lnk20250115 #ifdef __cplusplus extern "C" { #include "../mms/rdb_client.h" #include "node.h"//lnk20241223 #include "mvl_defs.h" #include "mms_vvar.h" #endif /* __cplusplus */ extern unsigned int g_node_id; extern int g_front_seg_index; extern char subdir[128]; extern int comtrade_remain_file_num; extern node_t* g_node; //lnk20241223 extern LD_info_t* find_LD_info_only_from_mp_id(char* mp_id);//lnk20241223 extern void print_terminal(const terminal* tmnl); #ifdef __cplusplus } #endif #ifndef nullptr #define nullptr NULL #endif extern QMutex kafka_data_list_mutex; extern QList kafka_data_list; extern QMutex oss_data_list_mutex; extern QList oss_data_list; extern int FILE_FLAG; KafkaSendThread myThrd; //WW 2023-08-22 增加数据库线程和WebSokcet线程 SQLExcuteThread sqlThrd; //Sql执行线程类对象 WebSocketThread socketThrd; //Web Socket线程类对象 WebhttpThread webhttpThrd; //Web http线程类对象 lnk202411 httpThread httpThrd; //Web http线程类对象 lnk202411 //mqtestThread mqtestThrd; //mqtest线程 lnk202412 //mqtestThread mqtestThrd(nullptr); //mqtest线程 lnk202412 mqconsumerThread mqconsumerThrd;//mq消费者线程lnk20241213 OnTimerThread onTimerThrd;//定时线程 extern QMutex Sql_data_list_mutex; //Sql执行语句锁 extern QList Sql_data_list; //Sql执行语句链表 extern int g_iOTLFlag; //Sql是否执行标志(0-不执行;1-执行) extern int g_iSqlListSize; //Sql执行语句链表允许最大元素个数 注:Sql链表中元素超过此个数,多出元素需移除! extern int FILE_FLAG; extern int SEND_FLAG; extern char* BROKER_LIST; extern char* TOPIC_STAT; extern char* TOPIC_PST; extern char* TOPIC_PLT; extern char* TOPIC_EVENT; extern char* TOPIC_ALARM; extern char* TOPIC_SNG; extern char* TOPIC_RTDATA;//lnk20241220 extern char* UDS_UPLOAD_URL; extern char g_onlyIP[255]; //直连某个IP,仅仅为方便测试 //WW 2023-08-22 end //lnk20241216添加mq消费者 extern std::string G_MQCONSUMER_IPPORT;//rocketmq ip+port extern std::string G_MQCONSUMER_TOPIC_RT;//topie_realtimedata extern std::string G_MQCONSUMER_TAG_RT;//tag extern std::string G_MQCONSUMER_KEY_RT;//key extern std::string G_MQCONSUMER_TOPIC_UD;//topie_update extern std::string G_MQCONSUMER_TAG_UD;//tag extern std::string G_MQCONSUMER_KEY_UD;//key extern std::string G_MQCONSUMER_TOPIC_RC;//topie_recall extern std::string G_MQCONSUMER_TAG_RC;//tag extern std::string G_MQCONSUMER_KEY_RC;//key extern std::string G_MQCONSUMER_TOPIC_SET;//topie_recall extern std::string G_MQCONSUMER_TAG_SET;//tag extern std::string G_MQCONSUMER_KEY_SET;//key extern std::string G_MQCONSUMER_TOPIC_LOG;//topie_log extern std::string G_MQCONSUMER_TAG_LOG;//tag extern std::string G_MQCONSUMER_KEY_LOG;//key extern std::string G_LOG_TOPIC;//topie extern std::string G_LOG_TAG;//tag extern std::string G_LOG_KEY;//key extern pthread_mutex_t errorListMutex; extern pthread_mutex_t warnListMutex; extern pthread_mutex_t normalListMutex; #define APRTIME_8H (28800000000ULL) #define APRTIME_1H (3600000000ULL) /////////////////////////////////////////////////////////////////////////////// const int MAX_LIST_SIZE = 16; static QMap > real_data_report_map; static QMap json_data_map;//CZY 2023-08-17 ww 2023年3月13日17:23:17扩展Map,用于保存各条线路的数据 static QMap json_flicker_data_map;//CZY 2023-09-11 展Map,用于保存各条线路的闪变数据 static QMap json_pst_data_map;//CZY 2023-09-11 展Map,用于保存各条线路的闪变数据 int urcbRealDataHasReceived(int dev_index, LD_info_t* LD_info, long long Time) { QList& ts_list = real_data_report_map[LD_info->line_id]; bool bFind = ts_list.contains(Time); //实时数据时间链表 if (bFind == false) { ts_list.append(Time); if (ts_list.size() > MAX_LIST_SIZE) ts_list.removeFirst(); //lnk20241223每收到一次实时数据就检查一下数量 int real_report_count = 0; real_report_count = get_real_report_count(LD_info); //调试 std::cout << "real_report_count is" << real_report_count << std::endl; std::cout << "mp limit is" << LD_info->limit << std::endl; if(real_report_count >= LD_info->limit){ std::cout << "real_report_count reach limit!!!"<< std::endl; //生成delete.xml if (!createXmlFile(dev_index, LD_info->line_id, 0, 0, 0,"delete")) { std::cerr << "Failed to create delete XML file!!!." << std::endl; } } return 0; //没有重复数据 } else return 1; //有重复数据 } ////////////////////////////////////////////////////////////////////////// void add_comm_log(char* log_str) { QDateTime now = QDateTime::currentDateTime(); QString level_str = QString("[info]"); QString head_str = QString(""); QString tail_str = QString(""); #ifdef __GNUC__ QString com_log_fn("/usr/local/saslog/"); #else QString com_log_fn("../etc/log/"); #endif if (g_node_id == STAT_DATA_BASE_NODE_ID) com_log_fn += "comm_100_stat.txt"; else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) com_log_fn += "comm_200_3s.txt"; else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) com_log_fn += "comm_300_comtrade.txt"; else if (g_node_id == HIS_DATA_BASE_NODE_ID) com_log_fn += "comm_400_his.txt"; else if (g_node_id == NEW_HIS_DATA_BASE_NODE_ID) { com_log_fn.append(QString("comm_400_his_%1.txt").arg(g_front_seg_index)); } else if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) com_log_fn += "comm_600_recall.txt"; else if (g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) { com_log_fn.append(QString("comm_700_allrecall_%1.txt").arg(g_front_seg_index)); } else com_log_fn += "comm_x00_unknown.txt"; QFile file(com_log_fn); if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append)) return; QTextStream out(&file); out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl; } void add_sng_log(char* log_str) { QDateTime now = QDateTime::currentDateTime(); QString level_str = QString("[info]"); QString head_str = QString(""); QString tail_str = QString(""); #ifdef __GNUC__ QString com_log_fn("/usr/local/saslog/"); #else QString com_log_fn("../etc/log/"); #endif com_log_fn += "sng_kafka_json.txt"; QFile file(com_log_fn); if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append)) return; QTextStream out(&file); out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl; } void add_stat_kafka_json_log(char* log_str) { QDateTime now = QDateTime::currentDateTime(); QString level_str = QString("[info]"); QString head_str = QString(""); QString tail_str = QString(""); #ifdef __GNUC__ QString com_log_fn("/usr/local/saslog/"); #else QString com_log_fn("../etc/log/"); #endif com_log_fn += "stat_kafka_json.txt"; QFile file(com_log_fn); if (!file.open(QIODevice::WriteOnly | QIODevice::Text | QIODevice::Append)) return; QTextStream out(&file); out << (now.toString("yyyy-MM-dd hh:mm:ss") + " " + level_str + " " + QString::fromAscii(log_str)) << endl; } //////////////////////////////////////////////////////////////////////////// /*void TrimLeft(std::string &s) { const std::string &space =" \f\n\t\r\v"; s.erase(0, s.find_first_not_of(space)); } void TrimRight(std::string &s) { const std::string &space =" \f\n\t\r\v"; s.erase(s.find_last_not_of(space) + 1); } void Trim(std::string &s) { const std::string &space =" \f\n\t\r\v"; s.erase(0, s.find_first_not_of(space)); s.erase(s.find_last_not_of(space) + 1); } int is_rpt_Time_exact_hour() { apr_time_t hour_time_t = g_db_info->TimeID[RPT_IDX]/APRTIME_8H*APRTIME_8H; if (g_db_info->TimeID[RPT_IDX]==hour_time_t) return TRUE; else return FALSE; } */ //char uuid_str[APR_UUID_FORMATTED_LENGTH+1]; //int iii; //for (iii=0;iii<10;iii++) { // apr_uuid_t uuid; // apr_uuid_get(&uuid); // apr_uuid_format(uuid_str,&uuid); // printf("uuid_str=%s \n",uuid_str); //} ////////////////////////////////////////////////////////////////////////// /*新增rocketmq发送数据lnk10-10*/ void my_rocketmq_send(Ckafka_data_t& data) { static std::string topic; static std::string cfg_His_tp; static std::string cfg_PLT_tp; static std::string cfg_PST_tp; static std::string cfg_Evt_tp; static std::string cfg_Alm_tp; static std::string cfg_Rt_tp; static bool init = false; if (!init) { cfg_His_tp = TOPIC_STAT; cfg_PLT_tp = TOPIC_PLT; cfg_PST_tp = TOPIC_PST; cfg_Evt_tp = TOPIC_EVENT; cfg_Alm_tp = TOPIC_ALARM; cfg_Rt_tp = TOPIC_RTDATA; init = true; } std::string key = data.mp_id.toStdString(); std::string senddata = data.strText.toStdString(); if (data.strTopic == "HISDATA") { topic = cfg_His_tp; } else if (data.strTopic == "PLT") { topic = cfg_PLT_tp; } else if (data.strTopic == "PST") { topic = cfg_PST_tp; } else if (data.strTopic == "Event") { topic = cfg_Evt_tp; } else if (data.strTopic == "Alm") { topic = cfg_Alm_tp; } else if (data.strTopic == "RTDATA")//lnk20241220 { topic = cfg_Rt_tp; } else { topic = data.strTopic.toStdString(); } if (g_onlyIP[0] != 0) { //单例模式 add_sng_log(data.strText.toAscii().data()); } rocketmq_producer_send(const_cast(senddata.c_str()),const_cast(topic.c_str())); //printf("\nrocketmq send, monitor_id:[%s] topic:[%s] Success\n", key.c_str(), topic.c_str()); } void my_kafka_send(Ckafka_data_t& data) { #ifdef __GNUC__ static FeKafkaProducer kafkaProducer; #endif int retsize = -1; static std::string topic; static std::string cfg_His_tp; static std::string cfg_PLT_tp; static std::string cfg_PST_tp; static std::string cfg_Evt_tp; static std::string cfg_Alm_tp; static std::string cfg_Sng_tp; static bool init = false; if (!init) { cfg_His_tp = TOPIC_STAT; cfg_PLT_tp = TOPIC_PLT; cfg_PST_tp = TOPIC_PST; cfg_Evt_tp = TOPIC_EVENT; cfg_Alm_tp = TOPIC_ALARM; cfg_Sng_tp = TOPIC_SNG; //QString topic_cfg = settings.value("Kafka/topic","").toString(); //printf("!!!!!!!!!kafka producer init Failed(%s)\n", cfg_tp); cout << cfg_His_tp << endl; //std::string brokerlist = brl_cfg.toStdString();//"10.240.16.145:6667,10.240.16.146:6667,10.240.16.147:6667,10.240.16.148:6667,10.240.16.149:6667"; std::string brokerlist = BROKER_LIST; //topic = topic_cfg.toStdString();//"test"; #ifdef __GNUC__ if (kafkaProducer.init(brokerlist)) { printf("kafka producer init success(%s)\n", brokerlist.c_str()); /*bool ret = kafkaProducer.create_topic(topic); if(ret) printf("create topic OK \n"); else printf("create topic Failed \n");*/ } else printf("kafka producer init Failed(%s)\n", brokerlist.c_str()); #endif init = true; } char tmp_str[256]; apr_snprintf(tmp_str, sizeof(tmp_str), "%d", data.monitor_id); std::string key = std::string(tmp_str); //std::string key_mp_id = data.mp_id.toStdString(); //key = data.monitor_id; std::string senddata = data.strText.toStdString(); if (data.strTopic == "HISDATA") { topic = cfg_His_tp; } else if (data.strTopic == "PLT") { topic = cfg_PLT_tp; } else if (data.strTopic == "PST") { topic = cfg_PST_tp; } else if (data.strTopic == "Event") { topic = cfg_Evt_tp; add_stat_kafka_json_log(data.strText.toAscii().data()); } else if (data.strTopic == "Alm") { topic = cfg_Alm_tp; } else { topic = data.strTopic.toStdString(); } //QDateTime currentTime = QDateTime::currentDateTime(); //QTime time = currentTime.time(); //if (time >= QTime(23, 30) || time < QTime(01, 00)) { // // The current time is between 23:00 and 00:30 // add_sng_log(data.strText.toAscii().data()); //} //add_sng_log(data.strText.toAscii().data()); if (g_onlyIP[0] != 0) { //单例模式 //topic = cfg_Evt_tp; //key = "2606L20071"; //senddata = "{\"DATA_TYPE\":\"02\" , \"Monitor\":\"2606123456\" , \"Value\":{\"TIME\":\"1699576200000\", \"F_S\":{\"A\":{ \"PST\":\"0.000000\",\"FLUC\":\"343917.750000\",\"FLUCF\":\"374275.000000\" }, \"B\":{ \"PST\":\"0.000000\",\"FLUC\":\"222171.156250\",\"FLUCF\":\"369039.000000\" }, \"C\":{ \"PST\":\"0.000000\",\"FLUC\":\"208060.968750\",\"FLUCF\":\"369239.000000\" }}}}"; //senddata = "{\"DATA_TYPE\":\"04\",\"Monitor\":\"2606L20071\",\"Value\":{\"FLAG\":1,\"TIME\":1700193136480,\"VOLTAGE\":{\"MAG\":95.181,\"DUR\":54,\"STARTTIME\":1700193136480,\"ENDTIME\":1700193136634,\"DISKIND\":\"01\",\"WAVEFILE\":\"PQ_PQLD1_000392_20231117_115216_580\",\"PHASIC\":\"B\"}}}"; add_sng_log(data.strText.toAscii().data()); //char* cstr = new char[senddata.length() + 1]; //std::strcpy(cstr, senddata.c_str()); //add_sng_log(cstr); //delete[] cstr; // 释放内存空间 } #ifdef __GNUC__ //send data retsize = kafkaProducer.send(senddata.c_str(), senddata.length(), topic, RdKafka::Topic::PARTITION_UA, &key); #endif if (retsize > 0) { printf("\nkafka send, monitor_id:[%s] topic:[%s] Success,return length %d\n", key.c_str(), topic.c_str(), retsize); } else printf("\nFailed kafka send, monitor_id:[%s] topic:[%s]\n", key.c_str(), topic.c_str()); //printf("\n--------------------------------------\n%s\n--------------------------------------\n",senddata.c_str() ); // WW 2023-08-16 } void my_datahub_send(Ckafka_data_t& data) { static std::string topic; static std::string cfg_His_tp; static std::string cfg_PLT_tp; static std::string cfg_PST_tp; static std::string cfg_Evt_tp; static std::string cfg_Alm_tp; static bool init = false; if (!init) { //QString MyKafkaIniFilename = QString("../etc/") + QString("mykafka.ini"); //+QString::fromAscii(subdir) //QSettings settings(MyKafkaIniFilename, QSettings::IniFormat); //QString brl_cfg = settings.value("Kafka/brokerlist", "").toString(); //QString topic_his = settings.value("Kafka/HisTopic", "").toString(); //QString topic_plt = settings.value("Kafka/PLTTopic", "").toString(); //QString topic_pst = settings.value("Kafka/PSTTopic", "").toString(); //QString topic_evt = settings.value("Kafka/EventTopic", "").toString(); //QString topic_alm = settings.value("Kafka/AlmTopic", "").toString(); //cfg_His_tp = topic_his.toStdString(); //cfg_PLT_tp = topic_plt.toStdString(); //cfg_PST_tp = topic_pst.toStdString(); //cfg_Evt_tp = topic_evt.toStdString(); //cfg_Alm_tp = topic_alm.toStdString(); cfg_His_tp = TOPIC_STAT; cfg_PLT_tp = TOPIC_PLT; cfg_PST_tp = TOPIC_PST; cfg_Evt_tp = TOPIC_EVENT; cfg_Alm_tp = TOPIC_ALARM; init = true; } std::string key = data.mp_id.toStdString(); std::string senddata = data.strText.toStdString(); if (data.strTopic == "HISDATA") { topic = cfg_His_tp; } else if (data.strTopic == "PLT") { topic = cfg_PLT_tp; } else if (data.strTopic == "PST") { topic = cfg_PST_tp; } else if (data.strTopic == "Event") { topic = cfg_Evt_tp; } else if (data.strTopic == "Alm") { topic = cfg_Alm_tp; } else { topic = data.strTopic.toStdString(); } if (g_onlyIP[0] != 0) { //单例模式 add_sng_log(data.strText.toAscii().data()); } DataHub_Send_Datahub(const_cast(topic.c_str()), const_cast(senddata.c_str())); printf("\ndatahub send, monitor_id:[%s] topic:[%s] Success\n", key.c_str(), topic.c_str()); } void concatenate_and_separate(char str1[], char str2[], QString* result) { QString qstr1 = QString(str1); QString qstr2 = QString(str2); *result = qstr1 + "-" + qstr2; } void KafkaSendThread::run() { //线程开始创建生产者lnk20241211 InitializeProducer(); printf("\nKafkaSendThread::run() is called ...... \n\n"); while (1) { Ckafka_data_t data; bool data_gotten; data_gotten = false; kafka_data_list_mutex.lock(); if (!kafka_data_list.isEmpty()) { data_gotten = true; data = kafka_data_list.takeFirst(); } kafka_data_list_mutex.unlock(); if (data_gotten) { static uint32_t count = 0; printf("BEGIN my_kafka_send no.%i -------->>>>>>>>>>>> %s \n", count, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); if (SEND_FLAG == 1) //kafka推送 { my_kafka_send(data); } else if (SEND_FLAG == 2)//datahub推送 { my_datahub_send(data); //DataHub_Send_Datahub(); } else if (SEND_FLAG == 3)//rocketmq推送lnk10-11 { my_rocketmq_send(data); } else //未配置 默认mq推送 { my_rocketmq_send(data); } printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); } //lnk20250225添加日志上送 Ckafka_data_t log_send; log_send.strTopic = QString::fromStdString(G_LOG_TOPIC); bool log_gotten; log_gotten = false; if (normalOutputEnabled) { // 如果 normalOutputEnabled 为 1,优先从 normalList 获取输出 // 处理 normalList 的输出 pthread_mutex_lock(&normalListMutex); if (!normalList.empty()) { //qDebug() << "flag of list:" << normalOutputEnabled << " " << warnOutputEnabled << " " << errorOutputEnabled << " " << "warnList size: " << warnList.size() << "normalList size: " << normalList.size() << "errorList size: " << errorList.size()<>>>>>>>>>>> %s \n", count,QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); my_rocketmq_send(log_send); } /*if (data_gotten) { LD_info_t* LD_info = find_LD_info_only_from_mp_id(data.mp_id.toAscii().data()); ied_t* ied; ied = find_ied_from_dev_code(LD_info->terminal_code); ied_usr_t* ied_usr = GET_IEDEXT_ADDR(ied); int cpuno; for (cpuno = 0; cpuno < ied->cpucount; cpuno++) { LD_info = &(ied_usr->LD_info[cpuno]); data.mp_id.clear(); data.mp_id.append(LD_info->mp_id); static uint32_t count = 0; printf("BEGIN my_kafka_send no.%i -------->>>>>>>>>>>> %s \n", count, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); my_kafka_send(data); printf("END my_kafka_send no.%i -------->>>>>>>>>>>> %s \n\n", count++, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); } }*/ //lnk 20241031 不再记录匹配率、合理性、异常 /* oss_data_t ossdata; bool oss_data_gotten; oss_data_gotten = false; oss_data_list_mutex.lock(); if (!oss_data_list.isEmpty()) { oss_data_gotten = true; ossdata = oss_data_list.takeFirst(); } oss_data_list_mutex.unlock(); if (oss_data_gotten) { char file_name[256]; memset(file_name, 0, 256); sprintf(file_name, "%s", ossdata.filename.toAscii().data()); char save_name[256]; memset(save_name, 0, 256); sprintf(save_name, "%s", ossdata.savename.toAscii().data()); QString uuid_file_name; std::ofstream file(save_name); // 创建一个输出文件流对象,打开文件 example.txt if (file.is_open()) { // 判断文件是否成功打开 file << ossdata.data.toAscii().data() << "\n"; file.close(); // 关闭文件 } else { cout << "Unable to open file\n" << endl; } if (FILE_FLAG == 1) { PutOSS(file_name, save_name); char* file; // 使用strrchr找到最后一个'/'的位置 char* last_slash = strrchr(file_name, '/'); if (last_slash != NULL) { // 最后一个'/'之后的部分就是文件名 file = last_slash + 1; } else { // 如果没有'/',则整个字符串就是文件名 file = file_name; } concatenate_and_separate(file_name, file, &uuid_file_name); } else if (FILE_FLAG == 2) { OBSFile(save_name, file_name, "putObject"); char* file; // 使用strrchr找到最后一个'/'的位置 char* last_slash = strrchr(file_name, '/'); if (last_slash != NULL) { // 最后一个'/'之后的部分就是文件名 file = last_slash + 1; } else { // 如果没有'/',则整个字符串就是文件名 file = file_name; } concatenate_and_separate(file_name, file, &uuid_file_name); } else if (FILE_FLAG == 3) { char* fileName = (char*)malloc(65 * sizeof(char)); char* uuid = (char*)malloc(65 * sizeof(char)); WebAPI_Uds_Upload(UDS_UPLOAD_URL, save_name, uuid, fileName); concatenate_and_separate(uuid, fileName, &uuid_file_name); free(fileName); free(uuid); } else { } */ //lnk 20241031 不再记录匹配率、合理性、异常 /* if (ossdata.log_name=="comm") { char tnml_code[128]; memset(tnml_code, 0, 128); sprintf(tnml_code, "%s", ossdata.id.toAscii().data()); errorlog_pgsql(tnml_code, ossdata.time, uuid_file_name); } else if (ossdata.log_name == "reason") { QString pgsql; pgsql.append(errorlog_num_pgsql(ossdata.id, ossdata.time, uuid_file_name, ossdata.list_num)); cout << pgsql.toAscii().data() << endl; } else if (ossdata.log_name == "match") { QString pqsql; pqsql.append(errorlog_datamatch_pgsql(ossdata.id, ossdata.time, ossdata.base_mat_num, ossdata.adv_mat_num, ossdata.base_act_num, ossdata.adv_act_num, uuid_file_name)); cout << pqsql.toAscii().data() << endl; } std::remove(save_name); } else { msleep(1); }*/ } //while(1) { //线程结束摧毁生产者 ShutdownAndDestroyProducer();//lnk20241211 } //lnk20241213补招部分/////////////////////////////////////////////////////////////////////////////////////////////// // 提取 'data' 数组并返回为新的 JSON 字符串 (返回 std::string) std::string extractDataJson(const char* inputJson) { // 解析输入 JSON 字符串 cJSON* root = cJSON_Parse(inputJson); if (root == NULL) { std::cerr << "Error parsing JSON" << std::endl; return ""; } // 提取 "messageBody" 部分 cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody"); if (messageJson == NULL || messageJson->type != cJSON_String) { std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; cJSON_Delete(root); return ""; } // 解析 messageBody 中的 JSON 字符串 const char* messageBodyStr = messageJson->valuestring; if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; cJSON_Delete(root); return ""; } cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 if (messageBody == NULL) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; cJSON_Delete(root); return ""; } // 提取 "data" 部分 cJSON* data = cJSON_GetObjectItem(messageBody, "data"); if (data == NULL || data->type != cJSON_Array) { std::cerr << "'data' is missing or is not an array" << std::endl; cJSON_Delete(root); return ""; } // 创建新的 JSON 数组对象,只包含 "data" 部分 cJSON* newJson = cJSON_CreateArray(); // 创建一个新的数组 // 将 "data" 数组中的元素逐个添加到新数组中 cJSON* dataItem = NULL; cJSON_ArrayForEach(dataItem, data) { cJSON_AddItemToArray(newJson, cJSON_Duplicate(dataItem, 1)); } // 将新的 JSON 数组转换为字符串 char* newJsonString = cJSON_Print(newJson); if (newJsonString == NULL) { std::cerr << "Error printing new JSON" << std::endl; cJSON_Delete(root); cJSON_Delete(newJson); return ""; } // 转换为 std::string 类型 std::string result(newJsonString); // 清理内存 free(newJsonString); cJSON_Delete(root); cJSON_Delete(newJson); return result; // 返回 std::string 类型的结果 } //实时数据部分////////////////////////////////////////////////////////////////////////////////////////////////////////// // 提取 JSON 消息中的相关字段 bool parseJsonMessageRT(const std::string& body, std::string& devSeries, std::string& line, bool& realData, bool& soeData, int& limit) { // 解析 JSON 数据 cJSON* root = cJSON_Parse(body.c_str()); if (root == NULL) { std::cerr << "Failed to parse JSON message." << std::endl; return false; } // 提取 "messageBody" 部分 cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody"); if (messageJson == NULL || messageJson->type != cJSON_String) { std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; cJSON_Delete(root); return false; } // 解析 messageBody 中的 JSON 字符串 const char* messageBodyStr = messageJson->valuestring; if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; cJSON_Delete(root); return false; } cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 if (messageBody == NULL) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; cJSON_Delete(root); return false; } // 提取字段 cJSON* devSeriesItem = cJSON_GetObjectItem(messageBody, "devSeries"); cJSON* lineItem = cJSON_GetObjectItem(messageBody, "line"); cJSON* realDataItem = cJSON_GetObjectItem(messageBody, "realData"); cJSON* soeDataItem = cJSON_GetObjectItem(messageBody, "soeData"); cJSON* limitItem = cJSON_GetObjectItem(messageBody, "limit"); if (devSeriesItem && lineItem && realDataItem && soeDataItem && limitItem) { devSeries = devSeriesItem->valuestring; line = lineItem->valuestring; realData = realDataItem->valueint; soeData = soeDataItem->valueint; limit = limitItem->valueint; } else { std::cerr << "Missing expected fields in JSON message." << std::endl; cJSON_Delete(root); return false; } cJSON_Delete(root); // 清理 JSON 对象 return true; } // 构造 XML 内容的函数新建和删除 std::string createnewXmlContent(int devindex, int mpindex, bool realData, bool soeData, int limit) { std::ostringstream xmlContent; xmlContent << "\n" << "\n" << " \n" << " \n" << " \n" << "\n"; return xmlContent.str(); } std::string createdeleteXmlContent(int devindex, int mpindex) { std::ostringstream xmlContent; xmlContent << "\n" << "\n" << " \n" << " \n" << " \n" << "\n"; return xmlContent.str(); } // 写入 XML 内容到文件的函数 bool writeToFile(const std::string& filePath, const std::string& xmlContent) { // 打开文件流以写入 XML 内容 std::ofstream outFile(filePath.c_str()); // 使用 c_str() 转换为 const char* if (outFile.is_open()) { outFile << xmlContent; // 写入内容 outFile.close(); std::cout << "XML file created at: " << filePath << std::endl; return true; } else { std::cerr << "Failed to open file for writing: " << filePath << std::endl; return false; } } // 创建并写入新的 XML 文件的主函数 bool createXmlFile(int devindex, int mpindex, bool realData, bool soeData, int limit,std::string type) { std::string xmlContent = ""; std::string directory = ""; std::string filePath = ""; if(type == "new"){ // 构造 XML 内容 xmlContent = createnewXmlContent(devindex, mpindex, realData, soeData, limit); // 设置文件路径 directory = "../etc/trigger3s/"; filePath = directory + "newtrigger.xml"; } else if(type == "delete"){ // 构造 XML 内容 xmlContent = createdeleteXmlContent(devindex, mpindex); // 设置文件路径 directory = "../etc/trigger3s/"; filePath = directory + "deletetrigger.xml"; } else{ std::cerr << "Failed to create xmlfile,type error: " << std::endl; return false; } // 创建目录(如果不存在) if (system(("mkdir -p " + directory).c_str()) != 0) { std::cerr << "Failed to create directory: " << directory << std::endl; return false; } // 将 XML 内容写入文件 return writeToFile(filePath, xmlContent); } //////////////////////////////////////////////////////////////////////////////////////////////////////////// //lnk20250108进程更新部分 // 用于关闭进程监听的端口 extern int server_socket; //Web Socket服务端实例 void close_listening_socket() { if (server_socket != -1) { // 关闭socket close(server_socket); std::cout << "Server socket closed successfully!" << std::endl; server_socket = -1; // 重置 server_socket } else { std::cout << "No server socket to close!" << std::endl; } } //用于校验ip格式 bool isValidIP(const std::string &ip) { std::vector parts; std::stringstream ss(ip); std::string part; // 使用 "." 作为分隔符将 IP 地址分割成各部分 while (getline(ss, part, '.')) { parts.push_back(part); } // IP 地址必须有 4 部分 if (parts.size() != 4) { return false; } // 校验每一部分是否为合法的数字且在 0 到 255 之间 for (size_t i = 0; i < parts.size(); ++i) { // 校验每部分是否为数字 for (size_t j = 0; j < parts[i].size(); ++j) { if (!isdigit(parts[i][j])) { return false; } } // 转换为整数并检查是否在有效范围内 int num = atoi(parts[i].c_str()); if (num < 0 || num > 255) { return false; } // 检查是否有前导零(如 01、001 等) if (parts[i].length() > 1 && parts[i][0] == '0') { return false; } } return true; } //执行脚本控制进程 void execute_bash(string fun,int process_num,string type) { // 为 char 数组分配足够的空间 char p_num_str[20]; // 使用 sprintf 转换 std::sprintf(p_num_str, "%d", process_num); const char* script = "/FeProject/bin/set_process.sh";//使用setsid防止端口占用 const char* param1 = fun.c_str(); const char* param2 = p_num_str; const char* param3 = type.c_str(); // 构造完整的命令 char command[256]; snprintf(command, sizeof(command), "%s %s %s %s &", script, param1, param2, param3); std::cout << "command:" << command <type != cJSON_String) { std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; cJSON_Delete(root); return ; } // 解析 messageBody 中的 JSON 字符串 const char* messageBodyStr = messageJson->valuestring; if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; cJSON_Delete(root); return; } cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 if (messageBody == NULL) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; cJSON_Delete(root); return ; } // 获取 code 字段 cJSON* code = cJSON_GetObjectItem(messageBody, "code"); if (code == nullptr) { std::cout << "Missing 'code' in JSON." << std::endl; cJSON_Delete(root); return; } cJSON* index = cJSON_GetObjectItem(messageBody, "index"); if (index == nullptr) { std::cout << "Missing 'index' in JSON." << std::endl; cJSON_Delete(root); return; } //判断是不是自己进程号: int index_value = index->valueint; //string index_value_str = index->valuestring; //int index_value = StringToInt(index_value_str); //进程号为0的进程处理所有控制消息 if (index_value != g_front_seg_index && g_front_seg_index !=0) { std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl; cJSON_Delete(root); return; } //进程号为0或者进程号匹配上 std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl; // 根据 code 字段值执行不同的解析逻辑 std::string code_str = code->valuestring; if (code_str == "set_process") { if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){ std::cout << "cfg_stat_data process" << g_front_seg_index <<" handle this msg" << std::endl; // 解析 set_process cJSON* data = cJSON_GetObjectItem(messageBody, "data"); if (data != nullptr && data->type == cJSON_Array) { int data_size = cJSON_GetArraySize(data); for (int i = 0; i < data_size; i++) { cJSON* item = cJSON_GetArrayItem(data, i); std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring; int processNum = cJSON_GetObjectItem(item, "processNum")->valueint; std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring; //校验数据 if((fun == "reset" || fun == "add") && (processNum >=1 && processNum < 10) && (frontType == "stat" || frontType == "recall" || frontType == "all")){ // 调用执行脚本函数 if(fun == "reset"){ close_listening_socket(); } execute_bash(fun, processNum, frontType); std::cout << "!!!!!!!!!!!!!!!! execute mark:" << i << " !!!!!!!!!!!!!!!" <type == cJSON_Array) { int data_size = cJSON_GetArraySize(data); for (int i = 0; i < data_size; i++) { cJSON* item = cJSON_GetArrayItem(data, i); std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring; std::string ip = cJSON_GetObjectItem(item, "ip")->valuestring; std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring; int proindex = cJSON_GetObjectItem(item, "proindex")->valueint; //校验数据 if((fun == "start" || fun == "delete") && isValidIP(ip) && (frontType == "stat" || frontType == "recall" || frontType == "3s" || frontType == "comtrade") && (proindex >= 10 && proindex < 100)){ //单连测试用的进程号应该大于10小于100 execute_bash_debug(fun, ip, frontType,proindex); std::cout << "!!!!!!!!!!!!!!!! execute mark:" << i << " !!!!!!!!!!!!!!!" <" << std::endl; indentLevel++; if (code_str == "ledger_modify" || code_str == "add_terminal") { // 如果是 modify 类型 if (code_str == "ledger_modify") { add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; } else { add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; } // 添加数据部分 add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.terminal_id << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.addr_str << "" << std::endl; // Assuming `addr_str` for IP add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.dev_type << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.maint_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.org_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.port << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.station_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.terminal_code << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.timestamp << "" << std::endl; // Assuming `timestamp` add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.tmnl_factory << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.tmnl_status << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.dev_series << "" << std::endl; //lnk20250210 add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.processNo << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.dev_key << "" << std::endl; // monitorData 部分 for (int i = 0; json_data.line[i].monitor_id[0] != '\0'; i++) { const monitor& monitor = json_data.line[i]; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.monitor_id << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.monitor_name << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.logical_device_seq << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.voltage_level << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.terminal_connect << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.timestamp << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.terminal_code << "" << std::endl; add_indent(xmlStream, indentLevel); xmlStream << "" << monitor.status << "" << std::endl; indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; // 结束 modify 或 add 标签 if (code_str == "ledger_modify") { indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } else { indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } } else if (code_str == "delete_terminal") { // 如果是 delete 类型 add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel++; add_indent(xmlStream, indentLevel); xmlStream << "" << json_data.terminal_id << "" << std::endl; indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; } else { std::cerr << "code_str error" << std::endl; return ""; } // 结束根节点 indentLevel--; add_indent(xmlStream, indentLevel); xmlStream << "" << std::endl; return xmlStream.str(); // 返回构造的 XML 字符串 } // 函数:将string字符串转换为整数 int StringToInt(const std::string& str) { std::stringstream ss(str); int number; ss >> number; // 从字符串流中读取整数 // 检查是否转换成功 if (ss.fail()) { std::cerr << "Conversion failed!" << std::endl; return 0; // 或者你可以选择返回一个标识失败的值,如-1 } return number; } // 解析 JSON 字符串并执行相应操作 void parse_log(const std::string& json_str) { // 解析 JSON 字符串 cJSON* root = cJSON_Parse(json_str.c_str()); if (root == nullptr) { std::cout << "Error parsing JSON." << std::endl; return; } // 提取 "messageBody" 部分 cJSON* messageJson = cJSON_GetObjectItem(root, "messageBody"); if (messageJson == NULL || messageJson->type != cJSON_String) { std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; cJSON_Delete(root); return ; } // 解析 messageBody 中的 JSON 字符串 const char* messageBodyStr = messageJson->valuestring; if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; cJSON_Delete(root); return; } cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 if (messageBody == NULL) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; cJSON_Delete(root); return ; } // 获取 code 字段 cJSON* code = cJSON_GetObjectItem(messageBody, "code"); if (code == nullptr) { std::cout << "Missing 'code' in JSON." << std::endl; cJSON_Delete(root); return; } cJSON* index = cJSON_GetObjectItem(messageBody, "index"); if (index == nullptr) { std::cout << "Missing 'index' in JSON." << std::endl; cJSON_Delete(root); return; } //判断是不是自己进程号: int index_value = index->valueint; //string index_value_str = index->valuestring; //int index_value = StringToInt(index_value_str); //进程号为0的进程处理所有台账更新消息 if (index_value != g_front_seg_index) { std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl; cJSON_Delete(root); return; } //进程号匹配上 std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl; // 根据 code 字段值执行不同的解析逻辑 std::string code_str = code->valuestring; if (code_str == "set_log") { // 解析 set_process cJSON* data = cJSON_GetObjectItem(messageBody, "data"); if (data != nullptr && data->type == cJSON_Array) { int data_size = cJSON_GetArraySize(data); for (int i = 0; i < data_size; i++) { cJSON* item = cJSON_GetArrayItem(data, i); std::string fun = cJSON_GetObjectItem(item, "fun")->valuestring; std::string level = cJSON_GetObjectItem(item, "level")->valuestring; std::string frontType = cJSON_GetObjectItem(item, "frontType")->valuestring; //校验数据 if(frontType == subdir){ if(fun == "open"){ if (level == "ERROR"){ // 启用错误输出 redirectErrorOutput(true); } else if (level == "WARN"){ // 启用告警输出 redirectWarnOutput(true); } else if (level == "NORMAL"){ // 启用普通输出 redirectNormalOutput(true); } else{ std::cout << "level error" <type != cJSON_String) { std::cerr << "'messageJson' is missing or is not an cJSON_String" << std::endl; cJSON_Delete(root); return ; } // 解析 messageBody 中的 JSON 字符串 const char* messageBodyStr = messageJson->valuestring; if (messageBodyStr == nullptr || strlen(messageBodyStr) == 0) { std::cerr << "Failed to parse 'messageBody' JSON or it's empty." << std::endl; cJSON_Delete(root); return; } cJSON* messageBody = cJSON_Parse(messageBodyStr); // 解析 messageBody 字符串 if (messageBody == NULL) { std::cerr << "Failed to parse 'messageBody' JSON." << std::endl; cJSON_Delete(root); return ; } // 获取 code 字段 cJSON* code = cJSON_GetObjectItem(messageBody, "code"); if (code == nullptr) { std::cout << "Missing 'code' in JSON." << std::endl; cJSON_Delete(root); return; } cJSON* index = cJSON_GetObjectItem(messageBody, "index"); if (index == nullptr) { std::cout << "Missing 'index' in JSON." << std::endl; cJSON_Delete(root); return; } //判断是不是自己进程号: int index_value = index->valueint; //string index_value_str = index->valuestring; //int index_value = StringToInt(index_value_str); //进程号为0的进程处理所有台账更新消息 if (index_value != g_front_seg_index && g_front_seg_index !=0) { std::cout << "msg index:"<< index_value <<"doesnt match self index:" << g_front_seg_index << std::endl; cJSON_Delete(root); return; } //进程号为0或者进程号匹配上 std::cout << "msg index:"<< index_value <<" self index:" << g_front_seg_index << std::endl; // 根据 code 字段值执行不同的解析逻辑 std::string code_str = code->valuestring; if (code_str == "add_terminal" || code_str == "ledger_modify") { std::cout << "add or update ledger" <type == cJSON_Array) { int data_size = cJSON_GetArraySize(data); for (int i = 0; i < data_size; i++) { cJSON* item = cJSON_GetArrayItem(data, i); terminal json_data; // 填充 terminal_dev 的数据 cJSON* id = cJSON_GetObjectItem(item, "id"); // terminal_id if (id && id->type == cJSON_String) std::strncpy(json_data.terminal_id, id->valuestring, sizeof(json_data.terminal_id) - 1); else std::strncpy(json_data.terminal_id, "N/A", sizeof(json_data.terminal_id) - 1); cJSON* name = cJSON_GetObjectItem(item, "name"); // terminal_code if (name && name->type == cJSON_String) std::strncpy(json_data.terminal_code, name->valuestring, sizeof(json_data.terminal_code) - 1); else std::strncpy(json_data.terminal_code, "N/A", sizeof(json_data.terminal_code) - 1); cJSON* org_name = cJSON_GetObjectItem(item, "org_name"); // org_name if (org_name && org_name->type == cJSON_String) std::strncpy(json_data.org_name, org_name->valuestring, sizeof(json_data.org_name) - 1); else std::strncpy(json_data.org_name, "N/A", sizeof(json_data.org_name) - 1); cJSON* maint_name = cJSON_GetObjectItem(item, "maint_name"); // maint_name if (maint_name && maint_name->type == cJSON_String) std::strncpy(json_data.maint_name, maint_name->valuestring, sizeof(json_data.maint_name) - 1); else std::strncpy(json_data.maint_name, "N/A", sizeof(json_data.maint_name) - 1); cJSON* station_name = cJSON_GetObjectItem(item, "stationName"); // station_name if (station_name && station_name->type == cJSON_String) std::strncpy(json_data.station_name, station_name->valuestring, sizeof(json_data.station_name) - 1); else std::strncpy(json_data.station_name, "N/A", sizeof(json_data.station_name) - 1); cJSON* manufacturer = cJSON_GetObjectItem(item, "manufacturer"); // tmnl_factory if (manufacturer && manufacturer->type == cJSON_String) std::strncpy(json_data.tmnl_factory, manufacturer->valuestring, sizeof(json_data.tmnl_factory) - 1); else std::strncpy(json_data.tmnl_factory, "N/A", sizeof(json_data.tmnl_factory) - 1); cJSON* status = cJSON_GetObjectItem(item, "status"); // tmnl_status if (status && status->type == cJSON_String) std::strncpy(json_data.tmnl_status, status->valuestring, sizeof(json_data.tmnl_status) - 1); else std::strncpy(json_data.tmnl_status, "N/A", sizeof(json_data.tmnl_status) - 1); cJSON* dev_type = cJSON_GetObjectItem(item, "devType"); // dev_type if (dev_type && dev_type->type == cJSON_String) std::strncpy(json_data.dev_type, dev_type->valuestring, sizeof(json_data.dev_type) - 1); else std::strncpy(json_data.dev_type, "N/A", sizeof(json_data.dev_type) - 1); cJSON* dev_key = cJSON_GetObjectItem(item, "devKey"); // dev_key if (dev_key && dev_key->type == cJSON_String) std::strncpy(json_data.dev_key, dev_key->valuestring, sizeof(json_data.dev_key) - 1); else std::strncpy(json_data.dev_key, "N/A", sizeof(json_data.dev_key) - 1); cJSON* dev_series = cJSON_GetObjectItem(item, "series"); // dev_series if (dev_series && dev_series->type == cJSON_String) std::strncpy(json_data.dev_series, dev_series->valuestring, sizeof(json_data.dev_series) - 1); else std::strncpy(json_data.dev_series, "N/A", sizeof(json_data.dev_series) - 1); //lnk20250210台账进程号 cJSON* processNo = cJSON_GetObjectItem(item, "processNo"); // processNo转为字符串 if (processNo && processNo->type == cJSON_Number) snprintf(json_data.processNo, sizeof(json_data.processNo), "%d", processNo->valueint); else strncpy(json_data.processNo, "N/A", sizeof(json_data.processNo) - 1); cJSON* ip = cJSON_GetObjectItem(item, "ip"); // addr_str if (ip && ip->type == cJSON_String) std::strncpy(json_data.addr_str, ip->valuestring, sizeof(json_data.addr_str) - 1); else std::strncpy(json_data.addr_str, "N/A", sizeof(json_data.addr_str) - 1); cJSON* port = cJSON_GetObjectItem(item, "port"); // port if (port && port->type == cJSON_String) std::strncpy(json_data.port, port->valuestring, sizeof(json_data.port) - 1); else std::strncpy(json_data.port, "N/A", sizeof(json_data.port) - 1); cJSON* updateTime = cJSON_GetObjectItem(item, "updateTime"); // timestamp if (updateTime && updateTime->type == cJSON_String) std::strncpy(json_data.timestamp, updateTime->valuestring, sizeof(json_data.timestamp) - 1); else std::strncpy(json_data.timestamp, "N/A", sizeof(json_data.timestamp) - 1); // monitorData 解析,填充到 line 数组中 cJSON* monitorData = cJSON_GetObjectItem(item, "monitorData"); if (monitorData != nullptr && monitorData->type == cJSON_Array) { int monitorData_size = cJSON_GetArraySize(monitorData); for (int j = 0; j < monitorData_size && j < 10; j++) { // 最多 10 个监测点 cJSON* monitor_item = cJSON_GetArrayItem(monitorData, j); monitor monitor_data; cJSON* monitor_id = cJSON_GetObjectItem(monitor_item, "id"); // monitor_id if (monitor_id && monitor_id->type == cJSON_String) std::strncpy(monitor_data.monitor_id, monitor_id->valuestring, sizeof(monitor_data.monitor_id) - 1); else std::strncpy(monitor_data.monitor_id, "N/A", sizeof(monitor_data.monitor_id) - 1); cJSON* monitor_name = cJSON_GetObjectItem(monitor_item, "name"); // monitor_name if (monitor_name && monitor_name->type == cJSON_String) std::strncpy(monitor_data.monitor_name, monitor_name->valuestring, sizeof(monitor_data.monitor_name) - 1); else std::strncpy(monitor_data.monitor_name, "N/A", sizeof(monitor_data.monitor_name) - 1); cJSON* voltage_level = cJSON_GetObjectItem(monitor_item, "voltageLevel"); // voltage_level if (voltage_level && voltage_level->type == cJSON_String) std::strncpy(monitor_data.voltage_level, voltage_level->valuestring, sizeof(monitor_data.voltage_level) - 1); else std::strncpy(monitor_data.voltage_level, "N/A", sizeof(monitor_data.voltage_level) - 1); cJSON* monitor_status = cJSON_GetObjectItem(monitor_item, "status"); // status if (monitor_status && monitor_status->type == cJSON_String) std::strncpy(monitor_data.status, monitor_status->valuestring, sizeof(monitor_data.status) - 1); else std::strncpy(monitor_data.status, "N/A", sizeof(monitor_data.status) - 1); cJSON* lineNo = cJSON_GetObjectItem(monitor_item, "lineNo"); // logical_device_seq if (lineNo && lineNo->type == cJSON_String) std::strncpy(monitor_data.logical_device_seq, lineNo->valuestring, sizeof(monitor_data.logical_device_seq) - 1); else std::strncpy(monitor_data.logical_device_seq, "N/A", sizeof(monitor_data.logical_device_seq) - 1); cJSON* ptType = cJSON_GetObjectItem(monitor_item, "ptType"); // terminal_connect if (ptType && ptType->type == cJSON_String) std::strncpy(monitor_data.terminal_connect, ptType->valuestring, sizeof(monitor_data.terminal_connect) - 1); else std::strncpy(monitor_data.terminal_connect, "N/A", sizeof(monitor_data.terminal_connect) - 1); std::strncpy(monitor_data.timestamp, json_data.timestamp, sizeof(monitor_data.timestamp) - 1); std::strncpy(monitor_data.terminal_code, json_data.terminal_code, sizeof(monitor_data.terminal_code) - 1); // 填充到 line 数组 json_data.line[j] = monitor_data; } } print_terminal(&json_data); // 准备 XML 内容并写入文件 std::string xmlContent = prepare_update(code_str, json_data); if (xmlContent != "") { std::cout << "write to xml in /FeProject/etc/ledger_update" <type == cJSON_Array) { int data_size = cJSON_GetArraySize(data); for (int i = 0; i < data_size; i++) { cJSON* item = cJSON_GetArrayItem(data, i); // 只解析 id 字段 cJSON* id = cJSON_GetObjectItem(item, "id"); if (id != nullptr) { terminal json_data; std::strncpy(json_data.terminal_id, cJSON_GetObjectItem(item, "id")->valuestring, sizeof(json_data.terminal_id) - 1); // 准备 XML 内容并写入文件 std::string xmlContent = prepare_update(code_str, json_data); if(xmlContent != ""){ char nodeid[20]; std::sprintf(nodeid, "%u", g_node_id); // "%u" 用于 unsigned int std::string nodeid_str(nodeid); std::string frontindex_str = intToString(g_front_seg_index); std::string file_name = output_dir + "/" + nodeid_str + "_" + frontindex_str + "_" + json_data.terminal_id + "_delete_terminal.xml"; writeToFile(file_name, xmlContent); } } } } } else{ std::cout << "code_str error" <n_clients; iedno++) { ied = g_node->clients[iedno]; ied_usr = (ied_usr_t*)ied->usr_ext; if (ied_usr && strcmp(ied_usr->terminal_id, dev_id.c_str()) == 0) { return ied_usr->dev_idx; } } return 0; } int find_mp_index_from_mp_id(std::string line) { LD_info_t* LD_info = NULL; LD_info = find_LD_info_only_from_mp_id((char*)line.c_str()); if(LD_info == NULL){ return 0; } else{ return LD_info->line_id; } } int myMessageCallbackrtdata(CPushConsumer* consumer, CMessageExt* msg) { if (msg == NULL) { std::cerr << "Received null message." << std::endl; return E_RECONSUME_LATER; } const char* body = GetMessageBody(msg); const char* key = GetMessageKeys(msg); if (body == NULL) { std::cerr << "Message body is NULL." << std::endl; return E_RECONSUME_LATER; } else{ // 处理消息(例如,打印消息内容) std::cout << "rt data Callback received message: " << body << std::endl; if (key) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } //处理消费数据 std::string devid, line; bool realData, soeData; int limit; // 解析 JSON 数据 if (!parseJsonMessageRT(body, devid, line, realData, soeData, limit)) { std::cerr << "Failed to parse the JSON message." << std::endl; return E_RECONSUME_LATER; } //mq处理实时数据指令查询台账时添加锁 pthread_mutex_lock(&mtx); std::cout << "update ledger xml hold lock !!!!!!!!!!!" << std::endl; int dev_index = find_dev_index_from_dev_id(devid); int mp_index = find_mp_index_from_mp_id(line); pthread_mutex_unlock(&mtx); std::cout << "update ledger xml free lock !!!!!!!!!!!" << std::endl; if(dev_index == 0 || mp_index == 0){ std::cerr << "dev index or mp index is 0" << std::endl; return E_RECONSUME_LATER; } // 创建 XML 文件 if (!createXmlFile(dev_index, mp_index, realData, soeData, limit,"new")) { std::cerr << "Failed to create the XML file." << std::endl; return E_RECONSUME_LATER; } } // 根据业务逻辑决定返回状态 return E_CONSUME_SUCCESS; } int myMessageCallbackupdate(CPushConsumer* consumer, CMessageExt* msg) { if (msg == NULL) { std::cerr << "Received null message." << std::endl; return E_RECONSUME_LATER; } const char* body = GetMessageBody(msg); const char* key = GetMessageKeys(msg); if (body == NULL) { std::cerr << "Message body is NULL." << std::endl; return E_RECONSUME_LATER; } else{ //处理消费数据 // 处理消息(例如,打印消息内容) std::cout << "ledger update Callback received message: " << body << std::endl; if (key) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } //处理台账更新消息 std::string updatefilepath = "/home/pq/FeProject/etc/ledgerupdate"; parse_control(body,updatefilepath); } // 根据业务逻辑决定返回状态 return E_CONSUME_SUCCESS; } int myMessageCallbackset(CPushConsumer* consumer, CMessageExt* msg) { if (msg == NULL) { std::cerr << "Received null message." << std::endl; return E_RECONSUME_LATER; } const char* body = GetMessageBody(msg); const char* key = GetMessageKeys(msg); if (body == NULL) { std::cerr << "Message body is NULL." << std::endl; return E_RECONSUME_LATER; } else{ //处理消费数据 // 处理消息(例如,打印消息内容) std::cout << "process Callback received message: " << body << std::endl; if (key) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } //处理进程更新消息 parse_set(body); } // 根据业务逻辑决定返回状态 return E_CONSUME_SUCCESS; } int myMessageCallbacklog(CPushConsumer* consumer, CMessageExt* msg) { if (msg == NULL) { std::cerr << "Received null message." << std::endl; return E_RECONSUME_LATER; } const char* body = GetMessageBody(msg); const char* key = GetMessageKeys(msg); if (body == NULL) { std::cerr << "Message body is NULL." << std::endl; return E_RECONSUME_LATER; } else{ //处理消费数据 // 处理消息(例如,打印消息内容) std::cout << "process Callback received message: " << body << std::endl; if (key) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } //处理进程更新消息 parse_log(body); } // 根据业务逻辑决定返回状态 return E_CONSUME_SUCCESS; } int myMessageCallbackrecall(CPushConsumer* consumer, CMessageExt* msg) { //调试 std::cout << "myMessageCallbackrecall"<< std::endl; if (msg == NULL) { std::cerr << "Received null message." << std::endl; return E_RECONSUME_LATER; } const char* body = GetMessageBody(msg); const char* key = GetMessageKeys(msg); if (body == NULL) { std::cerr << "Message body is NULL." << std::endl; return E_RECONSUME_LATER; } else{ // 处理消息(例如,打印消息内容) std::cout << "recall Callback received message: " << body << std::endl; if (key) { std::cout << "Message Key: " << key << std::endl; } else { std::cout << "Message Key: N/A" << std::endl; } //处理消费数据 std::string result = extractDataJson(body); // 使用 std::string 代替 malloc //调试 std::cout << "extractDataJson:"<< result.c_str() < subscriptions; // 初始化消费者1 //lnk20241230只有实时进程会订阅实时topic,不订阅实时topic的进程无法触发实时数据 if(g_node_id == THREE_SECS_DATA_BASE_NODE_ID){ subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RT, G_MQCONSUMER_TAG_RT, myMessageCallbackrtdata)); } // 初始化消费者2 //所有进程都会订阅台账更新topic,不同功能进程的台账不能互相影响 subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_UD, G_MQCONSUMER_TAG_UD, myMessageCallbackupdate)); // 初始化消费者3 //lnk20241230只有补招进程会订阅补招topic,不订阅补招topic的进程无法触发补招数据 if(g_node_id == RECALL_HIS_DATA_BASE_NODE_ID){ subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_RC, G_MQCONSUMER_TAG_RC, myMessageCallbackrecall)); } // 初始化消费者4 //lnk20250108只有稳态进程1会订阅控制topic,不订阅控制topic的进程无法触发进程重置 if(g_node_id == STAT_DATA_BASE_NODE_ID && g_front_seg_index == 1){ subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_SET, G_MQCONSUMER_TAG_SET, myMessageCallbackset)); } // 初始化消费者5 //所有进程都会订阅日志上送topic,不同功能进程的日志上送不能互相影响 subscriptions.push_back(Subscription(G_MQCONSUMER_TOPIC_LOG, G_MQCONSUMER_TAG_LOG, myMessageCallbacklog)); try { rocketmq_consumer_receive(consumerName, nameServer, subscriptions); } catch (const std::exception& e) { std::cerr << "Exception during consumerUD setup: " << e.what() << std::endl; } // 程序运行中,消费者会通过回调处理消息 // 模拟程序运行 std::cout << "Consumer is running. " << std::endl; //在主线程调用 //ShutdownAndDestroyConsumer(); } //CZY 2023-08-23 get double class voltage level, if false will return 0; double get_voltage_level(char voltage_level_char[]) { try { int n = atoi(voltage_level_char); switch (n) { case 1://交流6V return 0.006; case 2://交流12V return 0.012; case 3://交流24V return 0.024; case 4://交流36V return 0.036; case 5://交流48V return 0.048; case 6://交流110V return 0.11; case 7://交流220V return 0.22; case 8://交流380V(含400V) return 0.38; case 9://交流660V return 0.66; case 10://交流1000V(含1140V) return 1; case 11://交流600V return 0.6; case 12://交流750V return 0.75; case 13://交流1500V return 1.5; case 14://交流2000V return 2.0; case 15://交流2500V return 2.5; case 20://交流3kV return 3; case 21://交流6kV return 6; case 22://交流10kV return 10; case 23://交流15.75kV return 15.75; case 24://交流20kV return 20; case 25://交流35kV return 35; case 30://交流66kV return 66; case 31://交流72.5kV return 72.5; case 32://交流110kV return 110; case 33://交流220kV return 220; case 34://交流330kV return 330; case 35://交流500kV return 500; case 36://交流750kV return 750; case 37://交流1000kV return 1000; case 51://直流6V return 0.006; case 52://直流12V return 0.012; case 53://直流24V return 0.024; case 54://直流36V return 0.036; case 55://直流48V return 0.048; case 56://直流110V return 0.11; case 60://直流220V return 0.22; case 70://直流600V return 0.6; case 71://直流750V return 0.75; case 72://直流1500V return 1.5; case 73://直流3000V return 3.0; case 76://直流35kV return 35; case 77://直流30kV return 30; case 78://直流50kV return 50; case 80://直流120kV return 120; case 81://直流125kV return 125; case 82://直流400kV return 400; case 83://直流500kV return 500; case 84://直流660kV return 660; case 85://直流800kV return 800; case 86://直流1000kV return 1000; case 87://直流200kV return 200; case 88://直流320kV。 return 320; default: return 0; break; } } catch (const std::exception&) { //error return 0; } } void try_start_kafka_thread() { static int kafka_thread_created = 0; if (!kafka_thread_created) { myThrd.start(); kafka_thread_created = 1; } } //lnk20241213 void try_start_mqconsumer_thread() { static int mqconsumer_thread_created = 0; if (!mqconsumer_thread_created) { mqconsumerThrd.start(); mqconsumer_thread_created = 1; } } ///////////////////////////////////////////////////////////////////////// json_block_data json_blkd; //void init_json_block_data() //{ // json_blkd.monitorId = -1; // json_blkd.func_type = g_node_id; // //flag 是品质, 异常送1, 正常送0 // json_blkd.flag = 0; // //剔除标记,1不剔除,0剔除,默认剔除 // json_blkd.mms_str_map.clear(); //} //CZY 2023-08-17 WW 2022年12月6日14:09:08 增加多个ICD支持 //json_block_data json_blkd; //json拼接参数类对象,原有的一个数据对象,在多ICD下会出现数据错位问题 //解决方案是将此数据放入LDInfo结构中存储,保证一条线路一个json拼接参数类对象 void init_json_block_data(char mp_id[], char voltage_level[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改 { // 将 char[] 转换为 std::string //QString keyString(mp_id); json_block_data* pdata; if (flicker_flag == 1) { if (!json_flicker_data_map.contains(mp_id)) { pdata = new json_block_data(); json_flicker_data_map.insert(mp_id, pdata); } pdata = json_flicker_data_map.value(mp_id); } else if (flicker_flag == 0) { if (!json_data_map.contains(mp_id)) { pdata = new json_block_data(); json_data_map.insert(mp_id, pdata); } pdata = json_data_map.value(mp_id); } else if (flicker_flag == 2) { if (!json_pst_data_map.contains(mp_id)) { pdata = new json_block_data(); json_pst_data_map.insert(mp_id, pdata); } pdata = json_pst_data_map.value(mp_id); } if (pdata == NULL) return; pdata->monitorId = -1; QString tmp; tmp.append(mp_id); pdata->mp_id = tmp; pdata->func_type = g_node_id; //flag 是品质, 异常送1, 正常送0 pdata->flag = 0; // //剔除标记,1不剔除,0剔除,默认剔除 pdata->mms_str_map.clear(); pdata->voltage_level = get_voltage_level(voltage_level); //CZY 2023-08-23 add voltage_level } //0. json生成开始函数 //int json_block_create_start(int MonitorId ) //{ // try_start_kafka_thread(); // // init_json_block_data(); // json_blkd.monitorId = MonitorId; // printf("\n\n---------- json_block_create_start: MonitorId=%d \n",MonitorId); // return TRUE; //} int json_block_create_start(char voltage_level[], char monid_char[], int flicker_flag, char temcode[], int line_id)//WW 2023年3月13日16:38 : 41 多ICD修改 { try_start_kafka_thread(); //WW 2023-08-22 增加数据库线程 //try_start_sql_thread();//lnk2024118不需要sql线程 //WW end init_json_block_data(monid_char, voltage_level, flicker_flag); json_block_data* pdata; if (flicker_flag == 1) { if (!json_flicker_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_flicker_data_map.value(monid_char); } else if (flicker_flag == 0) { if (!json_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_data_map.value(monid_char); } else if (flicker_flag == 2) { if (!json_pst_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_pst_data_map.value(monid_char); } if (pdata != NULL) { pdata->dev_type.append(temcode); pdata->monitorId = line_id; if (strlen(monid_char) != 0) { QString tmp; tmp.append(monid_char); pdata->mp_id = tmp; } else { monid_char = "not define"; QString tmp; tmp.append(monid_char); pdata->mp_id = tmp; } } printf("\n\n---------- json_block_create_start: mp_id=%s,voltage_level=%s,line_id=%d \n", monid_char, voltage_level, line_id); return TRUE; } //1. json生成开始函数 //int json_block_create_time(int MonitorId , long long Time) //{ // json_blkd.time = Time; // printf("\njson_block_create_time: MonitorId=%d,Time=%lld \n",MonitorId,Time); // return TRUE; //} int json_block_create_time(char monid_char[], long long Time, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改 { json_block_data* pdata; if (flicker_flag == 1) { if (!json_flicker_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_flicker_data_map.value(monid_char); } else if (flicker_flag == 0) { if (!json_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_data_map.value(monid_char); } else if (flicker_flag == 2) { if (!json_pst_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_pst_data_map.value(monid_char); } if (pdata != NULL) pdata->time = Time; printf("\njson_block_create_time: mp_id=%s,Time=%lld \n", monid_char, Time); return TRUE; } //int json_block_create_flag(int MonitorId , int flag) //{ // json_blkd.flag = flag; // printf("\njson_block_create_flag: MonitorId=%d,flag=%d \n",MonitorId,flag); // return TRUE; //} int json_block_create_flag(char monid_char[], int flag, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改 { json_block_data* pdata; if (flicker_flag == 1) { if (!json_flicker_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_flicker_data_map.value(monid_char); } else if (flicker_flag == 0) { if (!json_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_data_map.value(monid_char); } else if (flicker_flag == 2) { if (!json_pst_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_pst_data_map.value(monid_char); } if (pdata != NULL) pdata->flag = flag; printf("\njson_block_create_flag: mp_id=%s,flag=%d \n", monid_char, flag); return TRUE; } //2. json生成数据回调函数 //int json_block_create_data(int MonitorId , char* mms_str , double v ) //{ // static int count = 0; // //WW2023-08-16 去掉log注释 // //printf("#"); // //if ( ((count++ %1000)==0) || (count <2000) ) // // printf("\n%d:json_block_create_data: MonitorId=%d,mms_str=%s,v=%f \n",count,MonitorId,mms_str,v); // // json_blkd.mms_str_map.insert(QString::fromAscii(mms_str), v); // return TRUE; //} int json_block_create_data(char monid_char[], char* mms_str, double v, int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改 { json_block_data* pdata; if (flicker_flag == 1) { if (!json_flicker_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_flicker_data_map.value(monid_char); } else if (flicker_flag == 0) { if (!json_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_data_map.value(monid_char); } else if (flicker_flag == 2) { if (!json_pst_data_map.contains(monid_char))//未查到数据 return 0; pdata = json_pst_data_map.value(monid_char); } static int count = 0; if (pdata != NULL) { pdata->mms_str_map.insert(QString::fromAscii(mms_str), v); if (strstr(mms_str, "MMXU2$MX$PhV")) printf("---------- json_block_create_data: mp_id= %s ,mms_str=%s value=%fkV----------\n", monid_char, mms_str, v); } return TRUE; } //3. json生成结束函数 //int json_block_create_end(int MonitorId ) //{ // printf("\n---------- json_block_create_end: MonitorId=%d \n\n\n",MonitorId); // // return transfer_json_block_data(&json_blkd); //} //lnk2024-8-16添加接线参数 int json_block_create_end(char v_wiring_type[], char monid_char[], int flicker_flag)//WW 2023年3月13日16:38:41 多ICD修改 { json_block_data* pdata; if (flicker_flag == 1) { if (!json_flicker_data_map.contains(monid_char))//未查到数据 { printf("---------- json_block_create_end: mp_id= %s json_flicker_data_map can't find MonitorId----------\n", monid_char); return 1; } pdata = json_flicker_data_map.value(monid_char); } else if (flicker_flag == 0) { if (!json_data_map.contains(monid_char))//未查到数据 { printf("---------- json_block_create_end: mp_id= %s json_data_map can't find MonitorId----------\n", monid_char); return 1; } pdata = json_data_map.value(monid_char); } else if (flicker_flag == 2) { if (!json_pst_data_map.contains(monid_char))//未查到数据 { printf("---------- json_block_create_end: mp_id= %s json_pst_data_map can't find MonitorId----------\n", monid_char); return 1; } pdata = json_pst_data_map.value(monid_char); } //int ret = transfer_json_block_data(pdata, DevKind);//CZY 2023-08-17 需要测试 if (pdata->mms_str_map.count() == 0) { if (flicker_flag == 1) { json_flicker_data_map.remove(monid_char); } else if (flicker_flag == 0) { json_data_map.remove(monid_char); } else if (flicker_flag == 2) { json_pst_data_map.remove(monid_char); } printf("---------- json_block_create_end: pdata->mms_str_map.count() == 0 ----------\n"); return 1; } //lnk2024-8-16添加接线参数 int ret = transfer_json_block_data(v_wiring_type, pdata); if (pdata != NULL) delete pdata; if (flicker_flag == 1) { json_flicker_data_map.remove(monid_char); } else if (flicker_flag == 0) { json_data_map.remove(monid_char); } else if (flicker_flag == 2) { json_pst_data_map.remove(monid_char); } printf("---------- json_block_create_end: MonitorId= %s ----------\n", monid_char); return ret; } //#define STATUS_NORMAL 0 /**< 正常 */ //拼接Kafka Producer发送暂态事件消息 例: //{"DATA_TYPE":"03", "TIME":"1542960911734", "1268918860":["CommResume"]} void prcess_monitor_comm_2_json(int monitor_id, int status, long long tm) { Ckafka_data_t data; QString status_str = (status == 0) ? "CommResume" : "CommInterrupt"; try_start_kafka_thread(); data.monitor_id = monitor_id; data.strTopic = "RTDATASOE"; data.strText = QString("{\"DATA_TYPE\":\"03\", \"TIME\":\"%1\", \"%2\":[\"%3\"]}") .arg(tm).arg(monitor_id).arg(status_str); //发生时刻,毫秒 //装置序号 例:1268918860 QString str = data.strTopic + " " + data.strText; printf("prcess_monitor_comm_2_json: %s \n", str.toStdString().c_str()); kafka_data_list_mutex.lock(); kafka_data_list.append(data); kafka_data_list_mutex.unlock(); } ////////////////////////////////////////////////////////////////////////////// //int transfer_json_block_data(json_block_data *data) //{ // Ckafka_data_t kafka_data; // kafka_data.patition_id = 0; // kafka_data.strText = QString("Time=%1").arg(data->time); // // kafka_data_list_mutex.lock(); // kafka_data_list.append(kafka_data); // kafka_data_list_mutex.unlock(); // return TRUE; //} void clear_old_comtrade_files() { if (g_node_id != SOE_COMTRADE_BASE_NODE_ID) return; QString full_fn_str; QString dir_name("../comtrade/"); QDir directory_comtrade(dir_name); QStringList fileNames = directory_comtrade.entryList(QDir::Files | QDir::NoDotAndDotDot, QDir::Time); if (fileNames.size() <= comtrade_remain_file_num) return; for (QStringList::size_type i = comtrade_remain_file_num; i != fileNames.size(); ++i) { full_fn_str = dir_name + fileNames.at(i); QFile::remove(full_fn_str); } } ///////////////////////////////////////////// //using namespace std; int process_login_verify() { int length = 64; char password[64 + 1]; char* p = NULL; int count = 0; char encode_password[256]; //password = "njcnpqs@2018" const char* passwordConfirm = "1c0e4e104de596846648ba495bd32601"; memset(password, 0, sizeof(password)); printf("Please input password : \n"); p = password; count = 0; system("stty -echo"); std::cin.getline(password, 64); system("stty echo"); //while (((*p = getch()) != 13) && count < length) { // //putch('*'); // //fflush(stdin); // p++; // count++; //} password[length] = '\0'; //printf("input typed password : %s \n",password); MyGetSM4Code(password, (unsigned char*)"epri.sgcc.com.cn", encode_password); //printf("encode_password : %s ,should be %s \n",encode_password,passwordConfirm); return (strcmp(encode_password, passwordConfirm)); } //////////////////////////////////////////// /////////////////////////////////////////// //WW 2023-08-22 增加数据库线程和WebSokcet线程 void SQLExcuteThread::run() { //if (THREE_SECS_DATA_BASE_NODE_ID == g_node_id)//3秒数据传输不需要写库 //return; if (1 != g_iOTLFlag) { Sql_data_list.clear(); return; } static uint32_t connect_state = 0; static uint32_t sql_count = 0;//2024-04-01 const char* pSql = nullptr; printf("SqlExcuteThread::run() is called ...... \n\n"); while (1) { msleep(1); if (!Sql_data_list.isEmpty()) { if (0 == sql_count++ % 300) { //db.connected int rtState = OTLDbconnected(); //int rtState = db.connected; if (rtState == 0 || connect_state != 0) { OTLDisconnect(); int ret = OTLConnect(); if (ret != 0 && ret != 32031) { bool bExit = false; for (int i = 0; i < 3; i++) { OTLDisconnect(); ret = OTLConnect(); if (ret != 0 && ret != 32031) { if (2 == i) bExit = true; else printf(">>>Postgresql reconnect %d times,errorcode= %d \n", i + 1, ret); } } if (bExit) { printf(">>>Postgresql reconnect 3 times,errorcode= %d,end thread!\n", ret); sleep(30); continue; //return; } } } } // printf("(写库)Sql执行语句链表Sql_data_list允许最大元素个数= %d,实有元素个数= %d \n", g_iSqlListSize, Sql_data_list.size()); Sql_data_list_mutex.lock(); std::string strSql = Sql_data_list.takeFirst().toStdString(); printf("get one sql \n"); if (strSql.length() < 11) { // printf("(写库)Sql执行语句链表Sql_data_list剩余元素个数= %d,当前执行Sql= %s,continue下一语句!\n", Sql_data_list.count(), strSql.c_str()); continue; } pSql = strSql.c_str(); Sql_data_list_mutex.unlock(); //printf("BEGIN my_sql_excute no.%i -------->>>>>>>> %s \n", count, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); /*if (2 == Log_Enable) printf("(写库)Sql执行语句链表Sql_data_list剩余元素个数= %i,当前执行Sql.%i= %s \n", Sql_data_list.count(), count, pSql);*/ printf("write one sql %s \n", pSql); int rt = write_to_db(pSql); connect_state = rt; printf("connect state %d \n", connect_state); //if (0 == rt) //{ // if (1 == Log_Enable) // printf("(写库)Sql执行成功.%i \n", count); // else // printf("(写库)Sql执行成功.%i,Sql= %s \n", count, pSql); //} //printf("END my_sql_excute no.%i -------->>>>>>>> %s \n\n", count++, QDateTime::currentDateTime().toString("yyyy-MM-dd hh:mm:ss.zzz").toAscii().data()); } } printf(">>>SqlExcuteThread::run() is end!!!\n"); } void try_start_sql_thread() { static int sql_thread_created = 0; if (!sql_thread_created) { //if (2 == Log_Enable) // printf(">>>即将启动Sql执行线程!\n"); sqlThrd.start(); sql_thread_created = 1; } } void try_start_socket_thread() { static int socket_thread_created = 0; if (!socket_thread_created) { //if (2 == Log_Enable) // printf(">>>即将启动Web Socket线程!\n"); socketThrd.start(); socket_thread_created = 1; } } //lnk20241029 void try_start_web_http_thread() { static int webhttp_thread_created = 0; if (!webhttp_thread_created) { webhttpThrd.start(); webhttp_thread_created = 1; } } void try_start_http_thread() { static int http_thread_created = 0; if (!http_thread_created) { httpThrd.start(); http_thread_created = 1; } } //lnk20241202 int try_start_mqtest_thread(int argc, char *argv[]) { //不使用简单的循环线程,而是启动一个app,不仅执行循环线程,而且可以连接输入 /*static int mqtest_thread_created = 0; if (!mqtest_thread_created) { mqtestThrd.start(); mqtest_thread_created = 1; }*/ QCoreApplication a(argc, argv); // 创建 QThread 和 Worker 对象 QThread *thread = new QThread(); Worker *worker = new Worker(); // 将 Worker 对象移动到 QThread 中 worker->moveToThread(thread); // 连接信号和槽 QObject::connect(thread, SIGNAL(started()), worker, SLOT(startServer())); QObject::connect(worker, SIGNAL(serverError()), thread, SLOT(quit())); QObject::connect(worker, SIGNAL(serverError()), worker, SLOT(deleteLater())); QObject::connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater())); // 启动线程 thread->start(); //std::cout << "start_mqtest"<>>即将启动Web Socket线程!\n"); onTimerThrd.start(); ontimer_thread_created = 1; } } //WW 2023-08-22 end /////////////////////////////////////////// //ZW 2024-01-31 补招数据模式优化 static QMap mvl_type_ctrl_map;//ZW 2024-01-31 用于保存单次获取的模型 static int mvl_type_ctrl_map_size;//计数 //static std::map myMap; //添加doname对应的数据模型 void add_mvl_type_ctrl(char doname[], int ctrl) { //printf("\nadd_mvl_type_ctrl: %s\n", doname); //printf("\nadd_mvl_type_ctrl: %p////%p\n", &ctrl,©); if (!mvl_type_ctrl_map.contains(doname)) { //MVL_TYPE_CTRL* copy = ctrl; mvl_type_ctrl_map.insert(doname, ctrl); } //printf("\nadd_mvl_type_ctrl: %p\n", &doname); } //删除map中所有数据模型 void del_mvl_type_ctrl() { for (QMap::iterator it = mvl_type_ctrl_map.begin(); it != mvl_type_ctrl_map.end(); ++it) { QString key = it.key(); int value = it.value(); mvl_type_id_destroy(value); } mvl_type_ctrl_map.clear(); } int get_mvl_type_ctrl_map_size() { return mvl_type_ctrl_map.count(); } //查找对应doname的数据模型是否存在map中 int sel_mvl_type_ctrl_flag(char doname[]) { if (mvl_type_ctrl_map.contains(doname)) { return mvl_type_ctrl_map.value(doname); } else { return -1; } } //ZW 2024-01-31 end