Files
microser/mms/rdb_client.c
2025-05-30 15:40:20 +08:00

550 lines
14 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file: $RCSfile: rdb_client.c,v $
* @brief: $PROFIBUS 与SSRTDB交互
*
* @version: $Revision: 1.11 $
* @date: $Date: 2020/10/28 05:21:18 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: rdb_client.c,v 1.11 2020/10/28 05:21:18 lizhongming Exp $
*/
#include <string.h>
#include "rdb_client.h"
#include "db_interface.h"
#include "node.h"
#include <pthread.h>//lnk20250114给台账添加互斥锁
#include "../log4cplus/log4.h"//lnk添加log4
/*lnk10-10 */////////////////////////////////
extern int HTTP_PORT;
extern int SOCKET_PORT;
extern int G_TEST_FLAG;
extern int g_front_seg_index;
extern int g_front_seg_num;
#include "../rocketmq/SimpleProducer.h"
#include "../cfg_parse/custom_printf.h"//lnk20250225
////////////////////////////////////////////
#ifdef DEBUG_SISCO
SD_CONST static ST_CHAR* SD_CONST thisFileName = __FILE__;
#endif
extern RPT_TYPEIDS g_rpt_typeids;
extern apr_pool_t* g_root_pool;
uint8_t set_mx_q;
node_t* g_node = NULL;
extern char g_my_conf_fname[256];
apr_pool_t* g_init_pool;
apr_pool_t* g_run_pool;
apr_pool_t* g_temp_dev_pool;
//lnk20250114给台账添加互斥锁
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
extern char g_onlyIP[255]; //直连某个IP仅仅为方便测试
//为无锡西径变调档添加
uint8_t set_mx_q;
pt61850app_t* g_pt61850app;
static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data);
static apr_status_t pt61850app_init();
///////////////////////////////////////////////////////////////////////////////
extern int three_secs_enabled;
///////////////////////////////////////////////////////////////////////////////
//WW 2023-08-22 start
int server_socket = -1;
//WW 2023-08-22 end
///////////////////////////////////////////////////////////////////////////////
static apr_status_t pt61850app_init()
{
apr_status_t rv;
if ((g_pt61850app = apr_pcalloc(g_run_pool, sizeof(pt61850app_t))) == NULL)
return APR_ENOMEM;
rv = apr_pool_create(&(g_pt61850app->tmp_pool), g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
g_pt61850app->chnl_counts = 0;
g_pt61850app->initNum = 0;
return APR_SUCCESS;
}
static apr_status_t allocate_LD_chnl_ext_mem()
{
int iedno, cpuno, chnl_no;
ied_t* ied;
ied_usr_t* ied_usr;
chnl_usr_t* chnl_usr;
g_pt61850app->chnl_counts = 0;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = apr_pcalloc(g_init_pool, sizeof(ied_usr_t));
ied->usr_ext = ied_usr;
if (ied_usr == NULL)
return APR_ENOMEM;
ied_usr->last_call_wavelist_time = sGetMsTime() + g_pt61850app->giTime * 1000 * 0.5;
ied_usr->LD_info = apr_pcalloc(g_init_pool, ied->cpucount * sizeof(LD_info_t));
if (ied_usr->LD_info == NULL)
return APR_ENOMEM;
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
ied_usr->LD_info[cpuno].ied = ied;
ied_usr->LD_info[cpuno].cpuno = ied->cpuinfo[cpuno].addr;
ied_usr->LD_info[cpuno].ht_fcd = apr_hash_make(g_init_pool);
ied_usr->LD_info[cpuno].ht_full_fcda = apr_hash_make(g_init_pool);
ied_usr->LD_info[cpuno].rptcount = 0;
}
for (chnl_no = 0; chnl_no < ied->chncount; chnl_no++) {
chnl_usr = apr_pcalloc(g_init_pool, sizeof(chnl_usr_t));
ied->channel[chnl_no].connect = chnl_usr;
chnl_usr->chnl = &(ied->channel[chnl_no]);
chnl_usr->chnl_id = chnl_no;
chnl_usr->m_state = CHANNEL_DISCONNECTED;
chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1);
}
g_pt61850app->chnl_counts += ied->chncount;
}
return APR_SUCCESS;
}
static apr_status_t read_DEV_idx_from_db()
{
int ret;
int iedno, cpuno;
ied_t* ied;
ied_usr_t* ied_usr;
LD_info_t* LD_info;
loginfo_t* loginfo = NULL;
int len, tmp;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = GET_IEDEXT_ADDR(ied);
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]);
if (LD_info->LD_name == NULL)
continue;
len = strlen(LD_info->LD_name);
tmp = LD_info->LD_name[len - 1] - '0';
LD_info->line_id = ied_usr->dev_idx * 10 + tmp;
if (ret != TRUE)
LD_info->line_id = -1;
if (LD_info->loginfo) {
loginfo = LD_info->loginfo[0];
}
}
}
return APR_SUCCESS;
}
apr_status_t init_rdb()
{
apr_status_t rv;
rv = apr_pool_create(&g_init_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
rv = apr_pool_create(&g_run_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
rv = apr_pool_create(&g_temp_dev_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
g_node = apr_pcalloc(g_run_pool, sizeof(node_t));
if (rv != APR_SUCCESS) {
return rv;
}
rv = pt61850app_init();
if (rv != APR_SUCCESS) {
return rv;
}
init_config();
GetServerIndexFromDB();
rv = parse_device_cfg_web();
if (rv != APR_SUCCESS) {
echo_errg("Parsed device config xml file with error,try to run! \n");
//char buf[256];
//format_log_msg(buf,sizeof(buf),"前置的%s%d号进程调用web台账接口失败", get_front_msg_from_subdir(), g_front_seg_index);
//log_error("process", buf);
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程调用web台账接口失败", get_front_msg_from_subdir(), g_front_seg_index);
return rv;
}
//台账读取过后初始化各级的日志
init_loggers();
rv = parse_model_cfg_web();
if (rv != APR_SUCCESS) {//不可能
echo_errg("Parsed model with error,try to run! \n");
//char buf[256];
//format_log_msg(buf,sizeof(buf),"前置的%s%d号进程调用web模型接口失败", get_front_msg_from_subdir(), g_front_seg_index);
//log_error("process", buf);
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程调用web模型接口失败", get_front_msg_from_subdir(), g_front_seg_index);
return rv;
}
Set_xml_nodeinfo();//解析xml模型
rv = parse_rpt_log_ini();//报告块初始化
if (rv != APR_SUCCESS) {
echo_errg("Failed to parse report log define ini file! \n");
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程报告初始化失败", get_front_msg_from_subdir(), g_front_seg_index);
return rv;
}
if (app_get_private_config(g_my_conf_fname) != APR_SUCCESS) {
echo_errg("Failed when processing private configuration\n");
DIY_ERRORLOG("process","【ERROR】前置的%s%d号进程读取mms配置失败", get_front_msg_from_subdir(), g_front_seg_index);
return APR_EGENERAL;
}
init_rem_dib_table();
return APR_SUCCESS;
}
extern int SOCKETENABLE;
extern int HTTPENABLE;
/*--------------------------- 规约初始化 -----------------------------------*/
apr_status_t run_protocol()
{
apr_status_t rv;
apr_thread_t* rtdb_thread;
static apr_threadattr_t* worker_attr = NULL;
//lnk20250214//单连模式进程先通过进程号0获取所有台账然后再更新自己的进程号
if (g_onlyIP[0] != 0 && g_front_seg_index == 0 && g_front_seg_num >= 10){ //这是web端控制打开的单连进程
g_front_seg_index = g_front_seg_num; //更新进程号为:为这个单连进程设置的进程号,用来控制实时日志
}
init_MMS();
if (worker_attr == NULL)
if ((rv = apr_threadattr_create(&worker_attr, g_run_pool)) != APR_SUCCESS)
return rv;
if ((rv = apr_threadattr_detach_set(worker_attr, 1)) != APR_SUCCESS)
return rv;
if ((rv = apr_threadattr_stacksize_set(worker_attr, 1920 * 1024)) != APR_SUCCESS)
return rv;
rv = apr_threadattr_guardsize_set(worker_attr, 4096);
if (rv != APR_SUCCESS && rv != APR_ENOTIMPL)
return rv;
if ((rv = apr_thread_create(&rtdb_thread, worker_attr, rtdb_worker, NULL, g_run_pool)) != APR_SUCCESS)
return rv;
try_start_kafka_thread();//mq线程
//lnk20241213添加mq消费者线程
try_start_mqconsumer_thread();
///////////////////WW 2023-08-22 增加数据库和WebSocket线程
if (g_onlyIP[0] != 0 || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)
{
printf("g_onlyIP[0] != 0!\n\a");
//单连进程不打开socket、http线程
}
else //socket、http、测试线程的开启
{
printf("g_onlyIP[0] == 0!\n\a");
if (1 == SOCKETENABLE)
{
server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
printf("Web Socket failed,error msg=%s\n\a", strerror(errno));
exit(1);
}
int ServerPort = 13000;//WW 这里后面需要从数据库的表中读取
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
ServerPort = SOCKET_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
ServerPort = SOCKET_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
ServerPort = SOCKET_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
ServerPort = SOCKET_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
struct sockaddr_in server_sockaddr;
memset(&server_sockaddr, 0, sizeof(server_sockaddr));
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(ServerPort);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(server_socket, (struct sockaddr*)&server_sockaddr, sizeof(server_sockaddr)) == -1)
{
printf("bind failed, error msg = %s\n\a", strerror(errno));
printf("bind ServerPort is = %s\n\a", ServerPort);
exit(1);
}
if (listen(server_socket, 20) == -1)
{
printf("listen server_socket= %d,ServerPort= %d failed,error msg = %s\n\a", server_socket, ServerPort, strerror(errno));
exit(1);
}
printf("\n listen server_socket= %d,ServerPort= %d,wait for Web Socket client connecting......\n", server_socket, ServerPort);
printf("try_start_socket_thread \n");
try_start_socket_thread();
}
if (1 == HTTPENABLE)
{
//lnk20241029增加http线程///////////////////////////////////////////////////////////////////////////////////////////////
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
HTTP_PORT = HTTP_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
HTTP_PORT = HTTP_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
HTTP_PORT = HTTP_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
}
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
HTTP_PORT = HTTP_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
printf("try_start_web_http_thread \n");
try_start_web_http_thread();
printf("try_start_http_thread \n");
try_start_http_thread();
//lnk20241029增加http线程///////////////////////////////////////////////////////////////////////////////////////////////////
}
}
printf("try_start_ontimer_thread \n");
try_start_ontimer_thread();
return APR_SUCCESS;
}
extern uint32_t g_dead_lock_counter;
extern uint32_t g_thread_blocked_times;
/*--------------------------- 连接实时库线程 -----------------------------------*/
static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data)
{
while (1) {
doCommService();//处理61850消息
check_3s_config();//3秒数据进程读取3秒触发
CheckNextNotConnectedChannel();//所有长连接进程判断连接状态
CheckAllConnectedChannel();//触发报告、日志补召、发送心跳
create_recall_xml();//生成待补招xml文件
pthread_mutex_lock(&mtx);
check_ledger_update();//lnk20250113读取台账更新触发台账更新
pthread_mutex_unlock(&mtx);
check_disk_quota();//判断磁盘空间
apr_pool_clear(g_pt61850app->tmp_pool);//清除临时缓存
g_dead_lock_counter = 0;
g_thread_blocked_times = 0;//监控线程
}
echo_msg("rtdb worker thread terminated...");
}
////////////////////////////////////////////////////////////
void Set_val_from_61850rpt(element_t* elem, double v)
{
GET_DOTEXT_ADDR(elem)->DataStatus |= VALUE_REACHED;
GET_DOTEXT_ADDR(elem)->m_v = v;
}
int Set_q_from_61850rpt(char* q)
{
int quality = 0;
if (q[0] == '0' && q[1] == '0')
quality = 0;
else
quality = 1;
return quality;
}
#define TIME_T_2036 (66*365* SECONDS_PER_DAY)
apr_time_t convert_btod_to_apr_time(MMS_BTOD* btod)
{
MMS_BTIME6 btime6;
btime6.day = btod->day;
btime6.ms = btod->ms;
return convert_btime6_to_apr_time(&btime6);
}
apr_time_t convert_btime6_to_apr_time(MMS_BTIME6* bTime6)
{
apr_time_t ticks;
if ((TIME_T_1984_JAN_1 + (bTime6->day * SECONDS_PER_DAY)) > TIME_T_2036) {
echo_warn("时标错误超过2036年");
}
ticks = TIME_T_1984_JAN_1;
ticks += (bTime6->day * SECONDS_PER_DAY);
ticks *= 1000;
ticks += bTime6->ms;
ticks *= 1000;
return ticks;
}
/*
61850
位 属性名称 值
0-1 Good 00
Invalid 01
Reserved 10
Questionable11
2 Overflow
3 OutofRange
4 BadReference
5 Oscillatory
6 Failure
7 OldData
8 Inconsistent
9 Inaccurate
10 Source 0(process)/1(Substituted)
11 Test
12 OperatorBlocked
*/
/*
60870
位 属性名称 值
0 溢出
1 保留
2 保留
3 保留
4 被封锁
5 被取代
6 非当前值
7 无效
*/
byte_t get_mx_q_from_61850(char* q_61850)
{
QDS q;
q.bits = 0;
if (set_mx_q) {
if (!strlen(q_61850)) {
return q.bits;
}
if (!strcmp("0000000000000", q_61850)) {
return q.bits;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
q.parts.IV = 1;
if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1')
q.parts.NT = 1;
if (q_61850[2] == '1')
q.parts.OV = 1;
if (q_61850[12] == '1')
q.parts.BL = 1;
if (q_61850[10] == '1')
q.parts.SB = 1;
}
return q.bits;
}
/*
60870
位 属性名称 值
0 保留
1 保留
2 保留
3 保留
4 被封锁
5 被取代
6 非当前值
7 无效
*/
byte_t get_st_q_from_61850(char* q_61850)
{
SIQ q;
q.bits = 0;
if (!strlen(q_61850)) {
return q.bits;
}
if (!strcmp("0000000000000", q_61850)) {
return q.bits;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
q.parts.IV = 1;
if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1')
q.parts.NT = 1;
if (q_61850[12] == '1')
q.parts.BL = 1;
if (q_61850[10] == '1')
q.parts.SB = 1;
return q.bits;
}
byte_t get_pulse_q_from_61850(char* q_61850)
{
if (!strlen(q_61850)) {
return 0;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
return 1;
else
return 0;
}