/** * @file: $RCSfile: rdb_client.c,v $ * @brief: $PROFIBUS 与SSRTDB交互 * * @version: $Revision: 1.11 $ * @date: $Date: 2020/10/28 05:21:18 $ * @author: $Author: lizhongming $ * @state: $State: Exp $ * * @latest: $Id: rdb_client.c,v 1.11 2020/10/28 05:21:18 lizhongming Exp $ */ #include #include "rdb_client.h" #include "db_interface.h" #include "node.h" #include //lnk20250114给台账添加互斥锁 /*lnk10-10 *///////////////////////////////// extern int HTTP_PORT; extern int SOCKET_PORT; extern int G_TEST_FLAG; extern int g_front_seg_index; extern int g_front_seg_num; #include "../include/rocketmq/SimpleProducer.h" //////////////////////////////////////////// #ifdef DEBUG_SISCO SD_CONST static ST_CHAR* SD_CONST thisFileName = __FILE__; #endif extern RPT_TYPEIDS g_rpt_typeids; //ied_info_t *my_info; extern apr_pool_t* g_root_pool; uint8_t set_mx_q; //rdb_t *g_rdb = NULL; node_t* g_node = NULL; extern char g_my_conf_fname[256]; apr_pool_t* g_init_pool; apr_pool_t* g_run_pool; apr_pool_t* g_temp_dev_pool; //lnk20250114给台账添加互斥锁 pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; extern char g_onlyIP[255]; //直连某个IP,仅仅为方便测试 //为无锡西径变调档添加 uint8_t set_mx_q; pt61850app_t* g_pt61850app; //application_t g_sysfile_app; //系统文件库应用信息结构 //int g_sysfile_appid = -1; //char *g_sysfile_filedir; //byte_t g_Master; //byte_t g_protect_file; //0:不召唤保护录播文件 1:召唤保护录播文件 //apr_time_t g_file_valid_time; //只召唤指定时间段中的文件(分钟) //byte_t g_file_name_len; //文件名允许存储的最大长度,默认为40,设为0时表示任意长度 //byte_t g_file_time_from; //文件有效时间取值何处,0是依赖于文件名,1取系统时间 static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data); static apr_status_t pt61850app_init(); //static apr_status_t app_process_command(command_t *cmd); /////////////////////////////////////////////////////////////////////////////// extern int three_secs_enabled; /////////////////////////////////////////////////////////////////////////////// //WW 2023-08-22 start int server_socket = -1; extern int g_iOTLFlag; //WW 2023-08-22 end /////////////////////////////////////////////////////////////////////////////// static apr_status_t pt61850app_init() { apr_status_t rv; if ((g_pt61850app = apr_pcalloc(g_run_pool, sizeof(pt61850app_t))) == NULL) return APR_ENOMEM; rv = apr_pool_create(&(g_pt61850app->tmp_pool), g_root_pool); if (rv != APR_SUCCESS) { return rv; } g_pt61850app->chnl_counts = 0; g_pt61850app->initNum = 0; return APR_SUCCESS; } static apr_status_t allocate_LD_chnl_ext_mem() { int iedno, cpuno, chnl_no; ied_t* ied; ied_usr_t* ied_usr; chnl_usr_t* chnl_usr; g_pt61850app->chnl_counts = 0; for (iedno = 0; iedno < g_node->n_clients; iedno++) { ied = g_node->clients[iedno]; ied_usr = apr_pcalloc(g_init_pool, sizeof(ied_usr_t)); ied->usr_ext = ied_usr; if (ied_usr == NULL) return APR_ENOMEM; ied_usr->last_call_wavelist_time = sGetMsTime() + g_pt61850app->giTime * 1000 * 0.5; ied_usr->LD_info = apr_pcalloc(g_init_pool, ied->cpucount * sizeof(LD_info_t)); if (ied_usr->LD_info == NULL) return APR_ENOMEM; for (cpuno = 0; cpuno < ied->cpucount; cpuno++) { ied_usr->LD_info[cpuno].ied = ied; ied_usr->LD_info[cpuno].cpuno = ied->cpuinfo[cpuno].addr; ied_usr->LD_info[cpuno].ht_fcd = apr_hash_make(g_init_pool); ied_usr->LD_info[cpuno].ht_full_fcda = apr_hash_make(g_init_pool); ied_usr->LD_info[cpuno].rptcount = 0; } for (chnl_no = 0; chnl_no < ied->chncount; chnl_no++) { chnl_usr = apr_pcalloc(g_init_pool, sizeof(chnl_usr_t)); ied->channel[chnl_no].connect = chnl_usr; chnl_usr->chnl = &(ied->channel[chnl_no]); chnl_usr->chnl_id = chnl_no; chnl_usr->m_state = CHANNEL_DISCONNECTED; chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1); } g_pt61850app->chnl_counts += ied->chncount; } return APR_SUCCESS; } static apr_status_t read_DEV_idx_from_db() { int ret; int iedno, cpuno; ied_t* ied; ied_usr_t* ied_usr; LD_info_t* LD_info; loginfo_t* loginfo = NULL; int len, tmp; for (iedno = 0; iedno < g_node->n_clients; iedno++) { ied = g_node->clients[iedno]; ied_usr = GET_IEDEXT_ADDR(ied); //read_DEV_Index_from_db(ied->channel[0].addr, &ied_usr->dev_idx); for (cpuno = 0; cpuno < ied->cpucount; cpuno++) { LD_info = &(ied_usr->LD_info[cpuno]); if (LD_info->LD_name == NULL) continue; len = strlen(LD_info->LD_name); tmp = LD_info->LD_name[len - 1] - '0'; LD_info->line_id = ied_usr->dev_idx * 10 + tmp; //ret = read_line_infos_from_db(LD_info->line_id, &LD_info->SubV_Index,&LD_info->Dev_Index,&LD_info->Sub_Index,&LD_info->GD_Index); if (ret != TRUE) LD_info->line_id = -1; if (LD_info->loginfo) { loginfo = LD_info->loginfo[0]; //read_updatetime_from_db(ied->channel[0].addr, &loginfo->start_time); } } } return APR_SUCCESS; } apr_status_t init_rdb() { apr_status_t rv; // driver_t* driver; rv = apr_pool_create(&g_init_pool, g_root_pool); if (rv != APR_SUCCESS) { return rv; } rv = apr_pool_create(&g_run_pool, g_root_pool); if (rv != APR_SUCCESS) { return rv; } rv = apr_pool_create(&g_temp_dev_pool, g_root_pool); if (rv != APR_SUCCESS) { return rv; } g_node = apr_pcalloc(g_run_pool, sizeof(node_t)); if (rv != APR_SUCCESS) { return rv; } //my_info = apr_pcalloc(g_run_pool,sizeof(ied_info_t)); rv = pt61850app_init(); if (rv != APR_SUCCESS) { return rv; } /*rv = parse_json_cfg(); if ( rv != APR_SUCCESS) { echo_errg("Failed to parse json define xml file! \n"); return rv; }*/ init_config(); GetServerIndexFromDB(); /*lnk10-10*/ //添加测试web接口 //rv = parse_device_web_test_ext(); //rv = parse_device_web_test_dev(); //rv = parse_device_web_test_front_read(); //rv = parse_device_web_test_front_write(); rv = parse_device_cfg_web(); //rv = parse_device_cfg(); //rv = parse_device_cfg_json(); //rv = parse_device_cfg_pg(); if (rv != APR_SUCCESS) { echo_errg("Parsed device config xml file with error,try to run! \n"); return rv; } /*lnk10-10*/ //rv = parse_line_cfg_web(); 合并到终端台账 //rv = parse_line_cfg(); //rv = parse_line_cfg_pg(); /*lnk10-10*/ rv = parse_model_cfg_web(); if (rv != APR_SUCCESS) { echo_errg("Parsed model with error,try to run! \n"); return rv; } //OTL_Select_xmlModel(); //xml模型数据库读取 Set_xml_nodeinfo();//解析xml模型 rv = parse_rpt_log_ini();//报告块初始化 if (rv != APR_SUCCESS) { echo_errg("Failed to parse report log define ini file! \n"); return rv; } if (app_get_private_config(g_my_conf_fname) != APR_SUCCESS) { echo_errg("Failed when processing private configuration\n"); return APR_EGENERAL; } init_rem_dib_table(); return APR_SUCCESS; } extern int SOCKETENABLE; extern int HTTPENABLE; /*--------------------------- 规约初始化 -----------------------------------*/ apr_status_t run_protocol() { apr_status_t rv; apr_thread_t* rtdb_thread; // apr_thread_t* mms_thread; static apr_threadattr_t* worker_attr = NULL; //lnk20250214//单连模式,进程先通过进程号0获取所有台账,然后再更新自己的进程号 if (g_onlyIP[0] != 0 && g_front_seg_index == 0 && g_front_seg_num >= 10){ //这是web端控制打开的单连进程 g_front_seg_index = g_front_seg_num; //更新进程号为:为这个单连进程设置的进程号,用来控制实时日志 } init_MMS(); if (worker_attr == NULL) if ((rv = apr_threadattr_create(&worker_attr, g_run_pool)) != APR_SUCCESS) return rv; if ((rv = apr_threadattr_detach_set(worker_attr, 1)) != APR_SUCCESS) return rv; if ((rv = apr_threadattr_stacksize_set(worker_attr, 1920 * 1024)) != APR_SUCCESS) return rv; rv = apr_threadattr_guardsize_set(worker_attr, 4096); if (rv != APR_SUCCESS && rv != APR_ENOTIMPL) return rv; if ((rv = apr_thread_create(&rtdb_thread, worker_attr, rtdb_worker, NULL, g_run_pool)) != APR_SUCCESS) return rv; try_start_kafka_thread();//mq线程 //lnk20241213添加mq消费者线程 try_start_mqconsumer_thread(); ///////////////////WW 2023-08-22 增加数据库和WebSocket线程 if (g_onlyIP[0] != 0 || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_ALL_DATA_BASE_NODE_ID) { printf("g_onlyIP[0] != 0!\n\a"); //单连进程不打开socket、http、测试线程 } else //socket、http、测试线程的开启 { printf("g_onlyIP[0] == 0!\n\a"); if (1 == SOCKETENABLE) { server_socket = socket(AF_INET, SOCK_STREAM, 0); if (server_socket == -1) { printf("Web Socket failed,error msg=%s\n\a", strerror(errno)); exit(1); } int ServerPort = 13000;//WW 这里后面需要从数据库的表中读取 if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 ServerPort = SOCKET_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召 ServerPort = SOCKET_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index; } else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集 ServerPort = SOCKET_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index; } else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波 ServerPort = SOCKET_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; } struct sockaddr_in server_sockaddr; memset(&server_sockaddr, 0, sizeof(server_sockaddr)); server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_port = htons(ServerPort); server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(server_socket, (struct sockaddr*)&server_sockaddr, sizeof(server_sockaddr)) == -1) { printf("bind failed, error msg = %s\n\a", strerror(errno)); printf("bind ServerPort is = %s\n\a", ServerPort); exit(1); } if (listen(server_socket, 20) == -1) { printf("listen server_socket= %d,ServerPort= %d failed,error msg = %s\n\a", server_socket, ServerPort, strerror(errno)); exit(1); } printf("\n listen server_socket= %d,ServerPort= %d,wait for Web Socket client connecting......\n", server_socket, ServerPort); printf("try_start_socket_thread \n"); try_start_socket_thread(); } if (1 == HTTPENABLE) { //lnk20241029增加http线程/////////////////////////////////////////////////////////////////////////////////////////////// if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集 HTTP_PORT = HTTP_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index; else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召 HTTP_PORT = HTTP_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index; } else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集 HTTP_PORT = HTTP_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index; } else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波 HTTP_PORT = HTTP_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index; } printf("try_start_web_http_thread \n"); try_start_web_http_thread(); printf("try_start_http_thread \n"); try_start_http_thread(); //lnk20241029增加http线程/////////////////////////////////////////////////////////////////////////////////////////////////// } if (1 == G_TEST_FLAG) { //lnk添加mq模拟测试 printf("try_start_mqtest_thread \n"); try_start_mqtest_thread(0,NULL); } } //lnk删除数据库线程 #if 0 if (1 == g_iOTLFlag) { printf("try_start_sql_thread \n"); try_start_sql_thread(); } else printf("sql_thread ignore \n"); #endif printf("try_start_ontimer_thread \n"); try_start_ontimer_thread(); //OTLTestSelect();//测试数据库连接 ///////////////////WW end return APR_SUCCESS; } extern uint32_t g_dead_lock_counter; extern uint32_t g_thread_blocked_times; /*--------------------------- 连接实时库线程 -----------------------------------*/ static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data) { // apr_event_t event; // command_t cmd[1]; // int i =0; /* Maintenance the clients request */ while (1) { /*测试用发送rocketmq消息 lnk10-10*/ //producer_send0(); //调试用 printf("check error4 !!!!!!!!!!!!!!\n"); doCommService();//处理61850消息 //调试用 printf("check error5 !!!!!!!!!!!!!!\n"); check_3s_config();//3秒数据进程读取3秒触发 //调试用 printf("check error6 !!!!!!!!!!!!!!\n"); pthread_mutex_lock(&mtx); CheckNextNotConnectedChannel();//所有长连接进程判断连接状态 pthread_mutex_unlock(&mtx); //调试用 printf("check error7 !!!!!!!!!!!!!!\n"); pthread_mutex_lock(&mtx); CheckAllConnectedChannel();//触发报告、日志补召、发送心跳 pthread_mutex_unlock(&mtx); //调试用 printf("check error8 !!!!!!!!!!!!!!\n"); //check_recall_config();//补召进程读取补召消息 create_recall_xml();//生成待补招xml文件 //调试用 printf("check error9 !!!!!!!!!!!!!!\n"); check_ledger_update();//lnk20250113读取台账更新,触发台账更新 //调试用 printf("check error3 !!!!!!!!!!!!!!\n"); //Check_Recall_Config(); /*if ((g_protect_file) && (g_pt61850app->initNum>=MIN_INIT_NUM) ) { tryCallWaveList_in_AllIeds(); }*/ //clear_old_comtrade_files(); check_disk_quota();//判断磁盘空间 //调试用 printf("check error1 !!!!!!!!!!!!!!\n"); apr_pool_clear(g_pt61850app->tmp_pool);//清除临时缓存 //调试用 printf("check error2 !!!!!!!!!!!!!!\n"); g_dead_lock_counter = 0; g_thread_blocked_times = 0;//监控线程 } echo_msg("rtdb worker thread terminated..."); } //////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////// void Set_val_from_61850rpt(element_t* elem, double v) { GET_DOTEXT_ADDR(elem)->DataStatus |= VALUE_REACHED; GET_DOTEXT_ADDR(elem)->m_v = v; } int Set_q_from_61850rpt(char* q) { int quality = 0; if (q[0] == '0' && q[1] == '0') quality = 0; else quality = 1; return quality; //set_rpt_QualityFlag(quality); } #define TIME_T_2036 (66*365* SECONDS_PER_DAY) apr_time_t convert_btod_to_apr_time(MMS_BTOD* btod) { MMS_BTIME6 btime6; btime6.day = btod->day; btime6.ms = btod->ms; return convert_btime6_to_apr_time(&btime6); } apr_time_t convert_btime6_to_apr_time(MMS_BTIME6* bTime6) { apr_time_t ticks; if ((TIME_T_1984_JAN_1 + (bTime6->day * SECONDS_PER_DAY)) > TIME_T_2036) { echo_warn("时标错误,超过2036年!"); } ticks = TIME_T_1984_JAN_1; ticks += (bTime6->day * SECONDS_PER_DAY); ticks *= 1000; ticks += bTime6->ms; ticks *= 1000; return ticks; } /* 61850 位 属性名称 值 0-1 Good 00 Invalid 01 Reserved 10 Questionable11 2 Overflow 3 OutofRange 4 BadReference 5 Oscillatory 6 Failure 7 OldData 8 Inconsistent 9 Inaccurate 10 Source 0(process)/1(Substituted) 11 Test 12 OperatorBlocked */ /* 60870 位 属性名称 值 0 溢出 1 保留 2 保留 3 保留 4 被封锁 5 被取代 6 非当前值 7 无效 */ byte_t get_mx_q_from_61850(char* q_61850) { QDS q; q.bits = 0; if (set_mx_q) { if (!strlen(q_61850)) { return q.bits; } if (!strcmp("0000000000000", q_61850)) { return q.bits; } if (q_61850[0] == '0' && q_61850[1] == '1') q.parts.IV = 1; if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1') q.parts.NT = 1; if (q_61850[2] == '1') q.parts.OV = 1; if (q_61850[12] == '1') q.parts.BL = 1; if (q_61850[10] == '1') q.parts.SB = 1; } return q.bits; } /* 60870 位 属性名称 值 0 保留 1 保留 2 保留 3 保留 4 被封锁 5 被取代 6 非当前值 7 无效 */ byte_t get_st_q_from_61850(char* q_61850) { SIQ q; q.bits = 0; if (!strlen(q_61850)) { return q.bits; } if (!strcmp("0000000000000", q_61850)) { return q.bits; } if (q_61850[0] == '0' && q_61850[1] == '1') q.parts.IV = 1; if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1') q.parts.NT = 1; if (q_61850[12] == '1') q.parts.BL = 1; if (q_61850[10] == '1') q.parts.SB = 1; return q.bits; } byte_t get_pulse_q_from_61850(char* q_61850) { if (!strlen(q_61850)) { return 0; } if (q_61850[0] == '0' && q_61850[1] == '1') return 1; else return 0; } /////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////