Files
microser/mms/rdb_client.c
2025-01-16 16:17:01 +08:00

592 lines
15 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: rdb_client.c,v $
* @brief: $PROFIBUS 与SSRTDB交互
*
* @version: $Revision: 1.11 $
* @date: $Date: 2020/10/28 05:21:18 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: rdb_client.c,v 1.11 2020/10/28 05:21:18 lizhongming Exp $
*/
#include <string.h>
#include "rdb_client.h"
#include "db_interface.h"
#include "node.h"
#include <pthread.h>//lnk20250114给台账添加互斥锁
/*lnk10-10 */////////////////////////////////
extern int HTTP_PORT;
extern int SOCKET_PORT;
extern int G_TEST_FLAG;
extern int g_front_seg_index;
extern int g_front_seg_num;
#include "../include/rocketmq/SimpleProducer.h"
////////////////////////////////////////////
#ifdef DEBUG_SISCO
SD_CONST static ST_CHAR* SD_CONST thisFileName = __FILE__;
#endif
extern RPT_TYPEIDS g_rpt_typeids;
//ied_info_t *my_info;
extern apr_pool_t* g_root_pool;
uint8_t set_mx_q;
//rdb_t *g_rdb = NULL;
node_t* g_node = NULL;
extern char g_my_conf_fname[256];
apr_pool_t* g_init_pool;
apr_pool_t* g_run_pool;
apr_pool_t* g_temp_dev_pool;
//lnk20250114给台账添加互斥锁
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
extern char g_onlyIP[255]; //直连某个IP仅仅为方便测试
//为无锡西径变调档添加
uint8_t set_mx_q;
pt61850app_t* g_pt61850app;
//application_t g_sysfile_app; //系统文件库应用信息结构
//int g_sysfile_appid = -1;
//char *g_sysfile_filedir;
//byte_t g_Master;
//byte_t g_protect_file; //0:不召唤保护录播文件 1:召唤保护录播文件
//apr_time_t g_file_valid_time; //只召唤指定时间段中的文件(分钟)
//byte_t g_file_name_len; //文件名允许存储的最大长度默认为40设为0时表示任意长度
//byte_t g_file_time_from; //文件有效时间取值何处0是依赖于文件名1取系统时间
static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data);
static apr_status_t pt61850app_init();
//static apr_status_t app_process_command(command_t *cmd);
///////////////////////////////////////////////////////////////////////////////
extern int three_secs_enabled;
///////////////////////////////////////////////////////////////////////////////
//WW 2023-08-22 start
int server_socket = -1;
extern int g_iOTLFlag;
//WW 2023-08-22 end
///////////////////////////////////////////////////////////////////////////////
static apr_status_t pt61850app_init()
{
apr_status_t rv;
if ((g_pt61850app = apr_pcalloc(g_run_pool, sizeof(pt61850app_t))) == NULL)
return APR_ENOMEM;
rv = apr_pool_create(&(g_pt61850app->tmp_pool), g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
g_pt61850app->chnl_counts = 0;
g_pt61850app->initNum = 0;
return APR_SUCCESS;
}
static apr_status_t allocate_LD_chnl_ext_mem()
{
int iedno, cpuno, chnl_no;
ied_t* ied;
ied_usr_t* ied_usr;
chnl_usr_t* chnl_usr;
g_pt61850app->chnl_counts = 0;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = apr_pcalloc(g_init_pool, sizeof(ied_usr_t));
ied->usr_ext = ied_usr;
if (ied_usr == NULL)
return APR_ENOMEM;
ied_usr->last_call_wavelist_time = sGetMsTime() + g_pt61850app->giTime * 1000 * 0.5;
ied_usr->LD_info = apr_pcalloc(g_init_pool, ied->cpucount * sizeof(LD_info_t));
if (ied_usr->LD_info == NULL)
return APR_ENOMEM;
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
ied_usr->LD_info[cpuno].ied = ied;
ied_usr->LD_info[cpuno].cpuno = ied->cpuinfo[cpuno].addr;
ied_usr->LD_info[cpuno].ht_fcd = apr_hash_make(g_init_pool);
ied_usr->LD_info[cpuno].ht_full_fcda = apr_hash_make(g_init_pool);
ied_usr->LD_info[cpuno].rptcount = 0;
}
for (chnl_no = 0; chnl_no < ied->chncount; chnl_no++) {
chnl_usr = apr_pcalloc(g_init_pool, sizeof(chnl_usr_t));
ied->channel[chnl_no].connect = chnl_usr;
chnl_usr->chnl = &(ied->channel[chnl_no]);
chnl_usr->chnl_id = chnl_no;
chnl_usr->m_state = CHANNEL_DISCONNECTED;
chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1);
}
g_pt61850app->chnl_counts += ied->chncount;
}
return APR_SUCCESS;
}
static apr_status_t read_DEV_idx_from_db()
{
int ret;
int iedno, cpuno;
ied_t* ied;
ied_usr_t* ied_usr;
LD_info_t* LD_info;
loginfo_t* loginfo = NULL;
int len, tmp;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = GET_IEDEXT_ADDR(ied);
//read_DEV_Index_from_db(ied->channel[0].addr, &ied_usr->dev_idx);
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]);
if (LD_info->LD_name == NULL)
continue;
len = strlen(LD_info->LD_name);
tmp = LD_info->LD_name[len - 1] - '0';
LD_info->line_id = ied_usr->dev_idx * 10 + tmp;
//ret = read_line_infos_from_db(LD_info->line_id, &LD_info->SubV_Index,&LD_info->Dev_Index,&LD_info->Sub_Index,&LD_info->GD_Index);
if (ret != TRUE)
LD_info->line_id = -1;
if (LD_info->loginfo) {
loginfo = LD_info->loginfo[0];
//read_updatetime_from_db(ied->channel[0].addr, &loginfo->start_time);
}
}
}
return APR_SUCCESS;
}
apr_status_t init_rdb()
{
apr_status_t rv;
// driver_t* driver;
rv = apr_pool_create(&g_init_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
rv = apr_pool_create(&g_run_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
rv = apr_pool_create(&g_temp_dev_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
g_node = apr_pcalloc(g_run_pool, sizeof(node_t));
if (rv != APR_SUCCESS) {
return rv;
}
//my_info = apr_pcalloc(g_run_pool,sizeof(ied_info_t));
rv = pt61850app_init();
if (rv != APR_SUCCESS) {
return rv;
}
/*rv = parse_json_cfg();
if ( rv != APR_SUCCESS) {
echo_errg("Failed to parse json define xml file! \n");
return rv;
}*/
init_config();
GetServerIndexFromDB();
/*lnk10-10*/
//添加测试web接口
//rv = parse_device_web_test_ext();
//rv = parse_device_web_test_dev();
//rv = parse_device_web_test_front_read();
//rv = parse_device_web_test_front_write();
rv = parse_device_cfg_web();
//rv = parse_device_cfg();
//rv = parse_device_cfg_json();
//rv = parse_device_cfg_pg();
if (rv != APR_SUCCESS) {
echo_errg("Parsed device config xml file with error,try to run! \n");
return rv;
}
/*lnk10-10*/
//rv = parse_line_cfg_web(); 合并到终端台账
//rv = parse_line_cfg();
//rv = parse_line_cfg_pg();
/*lnk10-10*/
rv = parse_model_cfg_web();
if (rv != APR_SUCCESS) {
echo_errg("Parsed model with error,try to run! \n");
return rv;
}
//OTL_Select_xmlModel(); //xml模型数据库读取
Set_xml_nodeinfo();//解析xml模型
rv = parse_rpt_log_ini();//报告块初始化
if (rv != APR_SUCCESS) {
echo_errg("Failed to parse report log define ini file! \n");
return rv;
}
if (app_get_private_config(g_my_conf_fname) != APR_SUCCESS) {
echo_errg("Failed when processing private configuration\n");
return APR_EGENERAL;
}
init_rem_dib_table();
return APR_SUCCESS;
}
extern int SOCKETENABLE;
extern int HTTPENABLE;
/*--------------------------- 规约初始化 -----------------------------------*/
apr_status_t run_protocol()
{
apr_status_t rv;
apr_thread_t* rtdb_thread;
// apr_thread_t* mms_thread;
static apr_threadattr_t* worker_attr = NULL;
init_MMS();
if (worker_attr == NULL)
if ((rv = apr_threadattr_create(&worker_attr, g_run_pool)) != APR_SUCCESS)
return rv;
if ((rv = apr_threadattr_detach_set(worker_attr, 1)) != APR_SUCCESS)
return rv;
if ((rv = apr_threadattr_stacksize_set(worker_attr, 1920 * 1024)) != APR_SUCCESS)
return rv;
rv = apr_threadattr_guardsize_set(worker_attr, 4096);
if (rv != APR_SUCCESS && rv != APR_ENOTIMPL)
return rv;
if ((rv = apr_thread_create(&rtdb_thread, worker_attr, rtdb_worker, NULL, g_run_pool)) != APR_SUCCESS)
return rv;
try_start_kafka_thread();
//lnk20241213添加mq消费者线程
try_start_mqconsumer_thread();
///////////////////WW 2023-08-22 增加数据库和WebSocket线程
if (g_onlyIP[0] != 0 || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)
{
printf("g_onlyIP[0] != 0!\n\a");
}
else
{
printf("g_onlyIP[0] == 0!\n\a");
if(1 == SOCKETENABLE)
{
server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
printf("Web Socket failed,error msg=%s\n\a", strerror(errno));
exit(1);
}
int ServerPort = 13000;//WW 这里后面需要从数据库的表中读取
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
ServerPort = SOCKET_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
ServerPort = SOCKET_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
ServerPort = SOCKET_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
ServerPort = SOCKET_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
struct sockaddr_in server_sockaddr;
memset(&server_sockaddr, 0, sizeof(server_sockaddr));
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(ServerPort);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(server_socket, (struct sockaddr*)&server_sockaddr, sizeof(server_sockaddr)) == -1)
{
printf("bind failed, error msg = %s\n\a", strerror(errno));
printf("bind ServerPort is = %s\n\a", ServerPort);
exit(1);
}
if (listen(server_socket, 20) == -1)
{
printf("listen server_socket= %d,ServerPort= %d failed,error msg = %s\n\a", server_socket, ServerPort, strerror(errno));
exit(1);
}
printf("\n listen server_socket= %d,ServerPort= %d,wait for Web Socket client connecting......\n", server_socket, ServerPort);
printf("try_start_socket_thread \n");
try_start_socket_thread();
}
if(HTTPENABLE)
{
//lnk20241029增加http线程///////////////////////////////////////////////////////////////////////////////////////////////
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
HTTP_PORT = HTTP_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
HTTP_PORT = HTTP_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
HTTP_PORT = HTTP_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
HTTP_PORT = HTTP_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
printf("try_start_web_http_thread \n");
try_start_web_http_thread();
printf("try_start_http_thread \n");
try_start_http_thread();
//lnk20241029增加http线程///////////////////////////////////////////////////////////////////////////////////////////////////
}
}
if (1 == G_TEST_FLAG) {
//lnk添加mq模拟测试
printf("try_start_mqtest_thread \n");
try_start_mqtest_thread(0,NULL);
}
if (1 == g_iOTLFlag) {
printf("try_start_sql_thread \n");
try_start_sql_thread();
}
else
printf("sql_thread ignore \n");
printf("try_start_ontimer_thread \n");
try_start_ontimer_thread();
//OTLTestSelect();//测试数据库连接
///////////////////WW end
return APR_SUCCESS;
}
extern uint32_t g_dead_lock_counter;
extern uint32_t g_thread_blocked_times;
/*--------------------------- 连接实时库线程 -----------------------------------*/
static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data)
{
// apr_event_t event;
// command_t cmd[1];
// int i =0;
/* Maintenance the clients request */
while (1) {
/*测试用发送rocketmq消息 lnk10-10*/
//producer_send0();
doCommService();//处理61850消息
check_3s_config();//3秒数据进程读取3秒触发
pthread_mutex_lock(&mtx);
CheckNextNotConnectedChannel();//所有长连接进程判断连接状态
pthread_mutex_unlock(&mtx);
pthread_mutex_lock(&mtx);
CheckAllConnectedChannel();//触发报告、日志补召、发送心跳
pthread_mutex_unlock(&mtx);
//check_recall_config();//补召进程读取补召消息
create_recall_xml();//生成待补招xml文件
check_ledger_update();//lnk20250113读取台账更新触发台账更新
//Check_Recall_Config();
/*if ((g_protect_file) && (g_pt61850app->initNum>=MIN_INIT_NUM) ) {
tryCallWaveList_in_AllIeds();
}*/
//clear_old_comtrade_files();
check_disk_quota();//判断磁盘空间
apr_pool_clear(g_pt61850app->tmp_pool);//清除临时缓存
g_dead_lock_counter = 0;
g_thread_blocked_times = 0;//监控线程
}
echo_msg("rtdb worker thread terminated...");
}
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
void Set_val_from_61850rpt(element_t* elem, double v)
{
GET_DOTEXT_ADDR(elem)->DataStatus |= VALUE_REACHED;
GET_DOTEXT_ADDR(elem)->m_v = v;
}
int Set_q_from_61850rpt(char* q)
{
int quality = 0;
if (q[0] == '0' && q[1] == '0')
quality = 0;
else
quality = 1;
return quality;
//set_rpt_QualityFlag(quality);
}
#define TIME_T_2036 (66*365* SECONDS_PER_DAY)
apr_time_t convert_btod_to_apr_time(MMS_BTOD* btod)
{
MMS_BTIME6 btime6;
btime6.day = btod->day;
btime6.ms = btod->ms;
return convert_btime6_to_apr_time(&btime6);
}
apr_time_t convert_btime6_to_apr_time(MMS_BTIME6* bTime6)
{
apr_time_t ticks;
if ((TIME_T_1984_JAN_1 + (bTime6->day * SECONDS_PER_DAY)) > TIME_T_2036) {
echo_warn("时标错误超过2036年");
}
ticks = TIME_T_1984_JAN_1;
ticks += (bTime6->day * SECONDS_PER_DAY);
ticks *= 1000;
ticks += bTime6->ms;
ticks *= 1000;
return ticks;
}
/*
61850
位 属性名称 值
0-1 Good 00
Invalid 01
Reserved 10
Questionable11
2 Overflow
3 OutofRange
4 BadReference
5 Oscillatory
6 Failure
7 OldData
8 Inconsistent
9 Inaccurate
10 Source 0(process)/1(Substituted)
11 Test
12 OperatorBlocked
*/
/*
60870
位 属性名称 值
0 溢出
1 保留
2 保留
3 保留
4 被封锁
5 被取代
6 非当前值
7 无效
*/
byte_t get_mx_q_from_61850(char* q_61850)
{
QDS q;
q.bits = 0;
if (set_mx_q) {
if (!strlen(q_61850)) {
return q.bits;
}
if (!strcmp("0000000000000", q_61850)) {
return q.bits;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
q.parts.IV = 1;
if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1')
q.parts.NT = 1;
if (q_61850[2] == '1')
q.parts.OV = 1;
if (q_61850[12] == '1')
q.parts.BL = 1;
if (q_61850[10] == '1')
q.parts.SB = 1;
}
return q.bits;
}
/*
60870
位 属性名称 值
0 保留
1 保留
2 保留
3 保留
4 被封锁
5 被取代
6 非当前值
7 无效
*/
byte_t get_st_q_from_61850(char* q_61850)
{
SIQ q;
q.bits = 0;
if (!strlen(q_61850)) {
return q.bits;
}
if (!strcmp("0000000000000", q_61850)) {
return q.bits;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
q.parts.IV = 1;
if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1')
q.parts.NT = 1;
if (q_61850[12] == '1')
q.parts.BL = 1;
if (q_61850[10] == '1')
q.parts.SB = 1;
return q.bits;
}
byte_t get_pulse_q_from_61850(char* q_61850)
{
if (!strlen(q_61850)) {
return 0;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
return 1;
else
return 0;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////