Files
microser/mms/rdb_client.c

550 lines
14 KiB
C
Raw Permalink Normal View History

2025-01-16 16:17:01 +08:00
/**
* @file: $RCSfile: rdb_client.c,v $
2025-05-09 16:53:07 +08:00
* @brief: $PROFIBUS SSRTDB交互
2025-01-16 16:17:01 +08:00
*
* @version: $Revision: 1.11 $
* @date: $Date: 2020/10/28 05:21:18 $
* @author: $Author: lizhongming $
* @state: $State: Exp $
*
* @latest: $Id: rdb_client.c,v 1.11 2020/10/28 05:21:18 lizhongming Exp $
*/
#include <string.h>
#include "rdb_client.h"
#include "db_interface.h"
#include "node.h"
2025-05-09 16:53:07 +08:00
#include <pthread.h>//lnk20250114给台账添加互斥锁
2025-01-16 16:17:01 +08:00
2025-05-14 16:42:29 +08:00
#include "../log4cplus/log4.h"//lnk添加log4
2025-01-16 16:17:01 +08:00
/*lnk10-10 */////////////////////////////////
extern int HTTP_PORT;
extern int SOCKET_PORT;
extern int G_TEST_FLAG;
extern int g_front_seg_index;
extern int g_front_seg_num;
2025-05-09 16:53:07 +08:00
#include "../rocketmq/SimpleProducer.h"
2025-03-04 17:29:04 +08:00
#include "../cfg_parse/custom_printf.h"//lnk20250225
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
////////////////////////////////////////////
#ifdef DEBUG_SISCO
SD_CONST static ST_CHAR* SD_CONST thisFileName = __FILE__;
#endif
extern RPT_TYPEIDS g_rpt_typeids;
extern apr_pool_t* g_root_pool;
uint8_t set_mx_q;
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
node_t* g_node = NULL;
extern char g_my_conf_fname[256];
apr_pool_t* g_init_pool;
apr_pool_t* g_run_pool;
apr_pool_t* g_temp_dev_pool;
2025-05-09 16:53:07 +08:00
//lnk20250114给台账添加互斥锁
2025-01-16 16:17:01 +08:00
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
2025-05-09 16:53:07 +08:00
extern char g_onlyIP[255]; //直连某个IP仅仅为方便测试
//为无锡西径变调档添加
2025-01-16 16:17:01 +08:00
uint8_t set_mx_q;
pt61850app_t* g_pt61850app;
static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data);
static apr_status_t pt61850app_init();
///////////////////////////////////////////////////////////////////////////////
extern int three_secs_enabled;
///////////////////////////////////////////////////////////////////////////////
//WW 2023-08-22 start
int server_socket = -1;
//WW 2023-08-22 end
///////////////////////////////////////////////////////////////////////////////
static apr_status_t pt61850app_init()
{
apr_status_t rv;
if ((g_pt61850app = apr_pcalloc(g_run_pool, sizeof(pt61850app_t))) == NULL)
return APR_ENOMEM;
rv = apr_pool_create(&(g_pt61850app->tmp_pool), g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
g_pt61850app->chnl_counts = 0;
g_pt61850app->initNum = 0;
return APR_SUCCESS;
}
static apr_status_t allocate_LD_chnl_ext_mem()
{
int iedno, cpuno, chnl_no;
ied_t* ied;
ied_usr_t* ied_usr;
chnl_usr_t* chnl_usr;
g_pt61850app->chnl_counts = 0;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = apr_pcalloc(g_init_pool, sizeof(ied_usr_t));
ied->usr_ext = ied_usr;
if (ied_usr == NULL)
return APR_ENOMEM;
ied_usr->last_call_wavelist_time = sGetMsTime() + g_pt61850app->giTime * 1000 * 0.5;
ied_usr->LD_info = apr_pcalloc(g_init_pool, ied->cpucount * sizeof(LD_info_t));
if (ied_usr->LD_info == NULL)
return APR_ENOMEM;
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
ied_usr->LD_info[cpuno].ied = ied;
ied_usr->LD_info[cpuno].cpuno = ied->cpuinfo[cpuno].addr;
ied_usr->LD_info[cpuno].ht_fcd = apr_hash_make(g_init_pool);
ied_usr->LD_info[cpuno].ht_full_fcda = apr_hash_make(g_init_pool);
ied_usr->LD_info[cpuno].rptcount = 0;
}
for (chnl_no = 0; chnl_no < ied->chncount; chnl_no++) {
chnl_usr = apr_pcalloc(g_init_pool, sizeof(chnl_usr_t));
ied->channel[chnl_no].connect = chnl_usr;
chnl_usr->chnl = &(ied->channel[chnl_no]);
chnl_usr->chnl_id = chnl_no;
chnl_usr->m_state = CHANNEL_DISCONNECTED;
chnl_usr->m_ClosedMsTime = NEXT_CONNECT_TIME * (-1);
}
g_pt61850app->chnl_counts += ied->chncount;
}
return APR_SUCCESS;
}
static apr_status_t read_DEV_idx_from_db()
{
int ret;
int iedno, cpuno;
ied_t* ied;
ied_usr_t* ied_usr;
LD_info_t* LD_info;
loginfo_t* loginfo = NULL;
int len, tmp;
for (iedno = 0; iedno < g_node->n_clients; iedno++) {
ied = g_node->clients[iedno];
ied_usr = GET_IEDEXT_ADDR(ied);
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
for (cpuno = 0; cpuno < ied->cpucount; cpuno++) {
LD_info = &(ied_usr->LD_info[cpuno]);
if (LD_info->LD_name == NULL)
continue;
len = strlen(LD_info->LD_name);
tmp = LD_info->LD_name[len - 1] - '0';
LD_info->line_id = ied_usr->dev_idx * 10 + tmp;
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
if (ret != TRUE)
LD_info->line_id = -1;
if (LD_info->loginfo) {
loginfo = LD_info->loginfo[0];
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
}
}
}
return APR_SUCCESS;
}
apr_status_t init_rdb()
{
apr_status_t rv;
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
rv = apr_pool_create(&g_init_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
rv = apr_pool_create(&g_run_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
rv = apr_pool_create(&g_temp_dev_pool, g_root_pool);
if (rv != APR_SUCCESS) {
return rv;
}
g_node = apr_pcalloc(g_run_pool, sizeof(node_t));
if (rv != APR_SUCCESS) {
return rv;
}
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
rv = pt61850app_init();
if (rv != APR_SUCCESS) {
return rv;
}
init_config();
GetServerIndexFromDB();
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
rv = parse_device_cfg_web();
if (rv != APR_SUCCESS) {
echo_errg("Parsed device config xml file with error,try to run! \n");
2025-05-20 16:31:12 +08:00
//char buf[256];
//format_log_msg(buf,sizeof(buf),"前置的%s%d号进程调用web台账接口失败", get_front_msg_from_subdir(), g_front_seg_index);
//log_error("process", buf);
2026-01-06 10:23:43 +08:00
DIY_ERRORLOG_CODE("process",LOG_CODE_LEDGER,"前置的%s%d号进程调用web台账接口失败", get_front_msg_from_subdir(), g_front_seg_index);
2025-05-20 16:31:12 +08:00
2025-01-16 16:17:01 +08:00
return rv;
}
2025-05-09 16:53:07 +08:00
//台账读取过后初始化各级的日志
2025-05-14 16:42:29 +08:00
init_loggers();
2025-05-09 16:53:07 +08:00
2025-01-16 16:17:01 +08:00
rv = parse_model_cfg_web();
2025-05-30 15:40:20 +08:00
if (rv != APR_SUCCESS) {//不可能
2025-01-16 16:17:01 +08:00
echo_errg("Parsed model with error,try to run! \n");
2025-05-20 16:31:12 +08:00
//char buf[256];
//format_log_msg(buf,sizeof(buf),"前置的%s%d号进程调用web模型接口失败", get_front_msg_from_subdir(), g_front_seg_index);
//log_error("process", buf);
2026-01-06 10:23:43 +08:00
DIY_ERRORLOG_CODE("process",LOG_CODE_ICD_AND_DOWNLOAD,"前置的%s%d号进程调用web模型接口失败", get_front_msg_from_subdir(), g_front_seg_index);
2025-05-20 16:31:12 +08:00
2025-01-16 16:17:01 +08:00
return rv;
}
2025-05-09 16:53:07 +08:00
Set_xml_nodeinfo();//解析xml模型
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
rv = parse_rpt_log_ini();//报告块初始化
2025-01-16 16:17:01 +08:00
if (rv != APR_SUCCESS) {
echo_errg("Failed to parse report log define ini file! \n");
2025-05-20 16:31:12 +08:00
2026-01-06 10:23:43 +08:00
DIY_ERRORLOG_CODE("process",LOG_CODE_RPTINIT,"前置的%s%d号进程报告初始化失败", get_front_msg_from_subdir(), g_front_seg_index);
2025-05-20 16:31:12 +08:00
2025-01-16 16:17:01 +08:00
return rv;
}
if (app_get_private_config(g_my_conf_fname) != APR_SUCCESS) {
echo_errg("Failed when processing private configuration\n");
2025-05-20 16:31:12 +08:00
2026-01-06 10:23:43 +08:00
DIY_ERRORLOG_CODE("process",LOG_CODE_OTHER,"前置的%s%d号进程读取mms配置失败", get_front_msg_from_subdir(), g_front_seg_index);
2025-05-20 16:31:12 +08:00
2025-01-16 16:17:01 +08:00
return APR_EGENERAL;
}
2025-05-20 16:31:12 +08:00
2025-01-16 16:17:01 +08:00
init_rem_dib_table();
return APR_SUCCESS;
}
extern int SOCKETENABLE;
extern int HTTPENABLE;
2025-05-09 16:53:07 +08:00
/*--------------------------- 规约初始化 -----------------------------------*/
2025-01-16 16:17:01 +08:00
apr_status_t run_protocol()
{
apr_status_t rv;
apr_thread_t* rtdb_thread;
2025-04-29 15:05:36 +08:00
2025-01-16 16:17:01 +08:00
static apr_threadattr_t* worker_attr = NULL;
2025-05-09 16:53:07 +08:00
//lnk20250214//单连模式进程先通过进程号0获取所有台账然后再更新自己的进程号
if (g_onlyIP[0] != 0 && g_front_seg_index == 0 && g_front_seg_num >= 10){ //这是web端控制打开的单连进程
g_front_seg_index = g_front_seg_num; //更新进程号为:为这个单连进程设置的进程号,用来控制实时日志
2025-02-14 16:44:38 +08:00
}
2025-01-16 16:17:01 +08:00
init_MMS();
if (worker_attr == NULL)
if ((rv = apr_threadattr_create(&worker_attr, g_run_pool)) != APR_SUCCESS)
return rv;
if ((rv = apr_threadattr_detach_set(worker_attr, 1)) != APR_SUCCESS)
return rv;
if ((rv = apr_threadattr_stacksize_set(worker_attr, 1920 * 1024)) != APR_SUCCESS)
return rv;
rv = apr_threadattr_guardsize_set(worker_attr, 4096);
if (rv != APR_SUCCESS && rv != APR_ENOTIMPL)
return rv;
if ((rv = apr_thread_create(&rtdb_thread, worker_attr, rtdb_worker, NULL, g_run_pool)) != APR_SUCCESS)
return rv;
2025-05-09 16:53:07 +08:00
try_start_kafka_thread();//mq线程
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
//lnk20241213添加mq消费者线程
2025-01-16 16:17:01 +08:00
try_start_mqconsumer_thread();
2025-05-09 16:53:07 +08:00
///////////////////WW 2023-08-22 增加数据库和WebSocket线程
2025-01-16 16:17:01 +08:00
if (g_onlyIP[0] != 0 || g_node_id == NEW_HIS_DATA_BASE_NODE_ID || g_node_id == HIS_DATA_BASE_NODE_ID || g_node_id == RECALL_ALL_DATA_BASE_NODE_ID)
{
printf("g_onlyIP[0] != 0!\n\a");
2025-02-11 18:23:19 +08:00
2025-05-09 16:53:07 +08:00
//单连进程不打开socket、http线程
2025-02-11 18:23:19 +08:00
2025-01-16 16:17:01 +08:00
}
2025-05-09 16:53:07 +08:00
else //socket、http、测试线程的开启
2025-01-16 16:17:01 +08:00
{
printf("g_onlyIP[0] == 0!\n\a");
2025-02-11 18:23:19 +08:00
if (1 == SOCKETENABLE)
2025-01-16 16:17:01 +08:00
{
server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (server_socket == -1) {
printf("Web Socket failed,error msg=%s\n\a", strerror(errno));
exit(1);
}
2025-05-09 16:53:07 +08:00
int ServerPort = 13000;//WW 这里后面需要从数据库的表中读取
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
2025-01-16 16:17:01 +08:00
ServerPort = SOCKET_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
2025-05-09 16:53:07 +08:00
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
2025-01-16 16:17:01 +08:00
ServerPort = SOCKET_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
}
2025-05-09 16:53:07 +08:00
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
2025-01-16 16:17:01 +08:00
ServerPort = SOCKET_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
}
2025-05-09 16:53:07 +08:00
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
2025-01-16 16:17:01 +08:00
ServerPort = SOCKET_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
struct sockaddr_in server_sockaddr;
memset(&server_sockaddr, 0, sizeof(server_sockaddr));
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(ServerPort);
server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(server_socket, (struct sockaddr*)&server_sockaddr, sizeof(server_sockaddr)) == -1)
{
printf("bind failed, error msg = %s\n\a", strerror(errno));
printf("bind ServerPort is = %s\n\a", ServerPort);
exit(1);
}
if (listen(server_socket, 20) == -1)
{
printf("listen server_socket= %d,ServerPort= %d failed,error msg = %s\n\a", server_socket, ServerPort, strerror(errno));
exit(1);
}
printf("\n listen server_socket= %d,ServerPort= %d,wait for Web Socket client connecting......\n", server_socket, ServerPort);
printf("try_start_socket_thread \n");
try_start_socket_thread();
}
2025-02-11 18:23:19 +08:00
if (1 == HTTPENABLE)
2025-01-16 16:17:01 +08:00
{
2025-05-09 16:53:07 +08:00
//lnk20241029增加http线程///////////////////////////////////////////////////////////////////////////////////////////////
if (g_node_id == STAT_DATA_BASE_NODE_ID)//统计采集
2025-01-16 16:17:01 +08:00
HTTP_PORT = HTTP_PORT + STAT_DATA_BASE_NODE_ID + g_front_seg_index;
2025-05-09 16:53:07 +08:00
else if (g_node_id == RECALL_HIS_DATA_BASE_NODE_ID) {//补召
2025-01-16 16:17:01 +08:00
HTTP_PORT = HTTP_PORT + RECALL_HIS_DATA_BASE_NODE_ID + g_front_seg_index;
}
2025-05-09 16:53:07 +08:00
else if (g_node_id == THREE_SECS_DATA_BASE_NODE_ID) {//3秒采集
2025-01-16 16:17:01 +08:00
HTTP_PORT = HTTP_PORT + THREE_SECS_DATA_BASE_NODE_ID + g_front_seg_index;
}
2025-05-09 16:53:07 +08:00
else if (g_node_id == SOE_COMTRADE_BASE_NODE_ID) {//暂态录波
2025-01-16 16:17:01 +08:00
HTTP_PORT = HTTP_PORT + SOE_COMTRADE_BASE_NODE_ID + g_front_seg_index;
}
printf("try_start_web_http_thread \n");
try_start_web_http_thread();
printf("try_start_http_thread \n");
try_start_http_thread();
2025-05-09 16:53:07 +08:00
//lnk20241029增加http线程///////////////////////////////////////////////////////////////////////////////////////////////////
2025-01-16 16:17:01 +08:00
}
2025-03-04 17:29:04 +08:00
2025-01-16 16:17:01 +08:00
}
printf("try_start_ontimer_thread \n");
try_start_ontimer_thread();
return APR_SUCCESS;
}
extern uint32_t g_dead_lock_counter;
extern uint32_t g_thread_blocked_times;
2025-05-09 16:53:07 +08:00
/*--------------------------- 连接实时库线程 -----------------------------------*/
2025-01-16 16:17:01 +08:00
static void* APR_THREAD_FUNC rtdb_worker(apr_thread_t* thd, void* data)
{
while (1) {
2025-05-09 16:53:07 +08:00
doCommService();//处理61850消息
2025-02-11 18:23:19 +08:00
2025-05-09 16:53:07 +08:00
check_3s_config();//3秒数据进程读取3秒触发
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
CheckNextNotConnectedChannel();//所有长连接进程判断连接状态
2025-02-11 18:23:19 +08:00
2025-05-09 16:53:07 +08:00
CheckAllConnectedChannel();//触发报告、日志补召、发送心跳
2025-02-11 18:23:19 +08:00
2025-05-09 16:53:07 +08:00
create_recall_xml();//生成待补招xml文件
2025-01-16 16:17:01 +08:00
2025-04-29 15:05:36 +08:00
pthread_mutex_lock(&mtx);
2025-05-09 16:53:07 +08:00
check_ledger_update();//lnk20250113读取台账更新触发台账更新
2025-04-29 15:05:36 +08:00
pthread_mutex_unlock(&mtx);
2025-01-16 16:17:01 +08:00
2025-05-09 16:53:07 +08:00
check_disk_quota();//判断磁盘空间
apr_pool_clear(g_pt61850app->tmp_pool);//清除临时缓存
2025-01-16 16:17:01 +08:00
g_dead_lock_counter = 0;
2025-05-09 16:53:07 +08:00
g_thread_blocked_times = 0;//监控线程
2025-01-16 16:17:01 +08:00
}
echo_msg("rtdb worker thread terminated...");
}
////////////////////////////////////////////////////////////
void Set_val_from_61850rpt(element_t* elem, double v)
{
GET_DOTEXT_ADDR(elem)->DataStatus |= VALUE_REACHED;
GET_DOTEXT_ADDR(elem)->m_v = v;
}
int Set_q_from_61850rpt(char* q)
{
int quality = 0;
if (q[0] == '0' && q[1] == '0')
quality = 0;
else
quality = 1;
return quality;
}
#define TIME_T_2036 (66*365* SECONDS_PER_DAY)
apr_time_t convert_btod_to_apr_time(MMS_BTOD* btod)
{
MMS_BTIME6 btime6;
btime6.day = btod->day;
btime6.ms = btod->ms;
return convert_btime6_to_apr_time(&btime6);
}
apr_time_t convert_btime6_to_apr_time(MMS_BTIME6* bTime6)
{
apr_time_t ticks;
if ((TIME_T_1984_JAN_1 + (bTime6->day * SECONDS_PER_DAY)) > TIME_T_2036) {
2025-05-09 16:53:07 +08:00
echo_warn("时标错误超过2036年");
2025-01-16 16:17:01 +08:00
}
ticks = TIME_T_1984_JAN_1;
ticks += (bTime6->day * SECONDS_PER_DAY);
ticks *= 1000;
ticks += bTime6->ms;
ticks *= 1000;
return ticks;
}
/*
61850
2025-05-09 16:53:07 +08:00
2025-01-16 16:17:01 +08:00
0-1 Good 00
Invalid 01
Reserved 10
Questionable11
2 Overflow
3 OutofRange
4 BadReference
5 Oscillatory
6 Failure
7 OldData
8 Inconsistent
9 Inaccurate
10 Source 0(process)/1(Substituted)
11 Test
12 OperatorBlocked
*/
/*
60870
2025-05-09 16:53:07 +08:00
0
1
2
3
4
5
6
7
2025-01-16 16:17:01 +08:00
*/
byte_t get_mx_q_from_61850(char* q_61850)
{
QDS q;
q.bits = 0;
if (set_mx_q) {
if (!strlen(q_61850)) {
return q.bits;
}
if (!strcmp("0000000000000", q_61850)) {
return q.bits;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
q.parts.IV = 1;
if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1')
q.parts.NT = 1;
if (q_61850[2] == '1')
q.parts.OV = 1;
if (q_61850[12] == '1')
q.parts.BL = 1;
if (q_61850[10] == '1')
q.parts.SB = 1;
}
return q.bits;
}
/*
60870
2025-05-09 16:53:07 +08:00
0
1
2
3
4
5
6
7
2025-01-16 16:17:01 +08:00
*/
byte_t get_st_q_from_61850(char* q_61850)
{
SIQ q;
q.bits = 0;
if (!strlen(q_61850)) {
return q.bits;
}
if (!strcmp("0000000000000", q_61850)) {
return q.bits;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
q.parts.IV = 1;
if (q_61850[0] == '1' && q_61850[1] == '1' && q_61850[7] == '1')
q.parts.NT = 1;
if (q_61850[12] == '1')
q.parts.BL = 1;
if (q_61850[10] == '1')
q.parts.SB = 1;
return q.bits;
}
byte_t get_pulse_q_from_61850(char* q_61850)
{
if (!strlen(q_61850)) {
return 0;
}
if (q_61850[0] == '0' && q_61850[1] == '1')
return 1;
else
return 0;
}