Files
front_linux/LFtid1056/main_thread.cpp

281 lines
8.7 KiB
C++
Raw Permalink Normal View History

2025-06-13 11:29:59 +08:00
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
2025-06-20 16:20:59 +08:00
#include "PQSMsg.h"
#include "client2.h"
2025-06-20 09:25:17 +08:00
#include "dealMsg.h"
2025-06-13 11:29:59 +08:00
2025-06-20 16:20:59 +08:00
#include "cloudfront/code/interface.h"
#if 0
2025-06-13 11:29:59 +08:00
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
#define THREAD_CONNECTIONS 10 // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><DFB3><EFBFBD>
#define MONITOR_INTERVAL 1 // <20><><EFBFBD>ؼ<EFBFBD><D8BC><EFBFBD>(<28><>)
/* <20>߳<EFBFBD>״̬ö<CCAC><C3B6> */
typedef enum {
THREAD_RUNNING, // 0:<3A><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
THREAD_STOPPED, // 1:<3A><><EFBFBD><EFBFBD>ֹͣ
THREAD_RESTARTING, // 2:<3A><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
THREAD_CRASHED // 3:<3A><EFBFBD><ECB3A3><EFBFBD><EFBFBD>
} thread_state_t;
/* <20>߳̿<DFB3><CCBF>ƽ<C6BD><E1B9B9> */
typedef struct {
pthread_t tid; // <20>߳<EFBFBD>ID
int index; // <20>̱߳<DFB3><CCB1><EFBFBD>(0~CONNECTIONS-1)
thread_state_t state; // <20><>ǰ״̬
pthread_mutex_t lock; // <20>߳<EFBFBD>ר<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>
} thread_info_t;
2025-06-20 16:20:59 +08:00
#endif
2025-06-13 11:29:59 +08:00
/* ȫ<>ֱ<EFBFBD><D6B1><EFBFBD> */
thread_info_t thread_info[THREAD_CONNECTIONS]; // <20>߳<EFBFBD><DFB3><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>
pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // ȫ<>ֻ<EFBFBD><D6BB><EFBFBD><EFBFBD><EFBFBD>
2025-06-20 09:25:17 +08:00
extern SafeMessageQueue message_queue;
2025-06-13 11:29:59 +08:00
/* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̳߳<DFB3>*/
void* work_thread(void* arg) {
int index = *(int*)arg; // <20><>ȡ<EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD>
free(arg); // <20>ͷŶ<CDB7>̬<EFBFBD><CCAC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڴ<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>״̬Ϊ<CCAC><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
printf("Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
// ģ<><EFBFBD><E2B9A4>ѭ<EFBFBD><D1AD>(5<><35><EFBFBD><EFBFBD><EFBFBD><EFBFBD>)
while (1) {
sleep(5);
// 10%<25><><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3><EFBFBD>̹߳<DFB3><CCB9><EFBFBD>
if (rand() % 10 == 0) {
pthread_mutex_lock(&thread_info[index].lock);
printf("Thread %d simulated failure\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
break;
}
}
// <20>߳<EFBFBD><DFB3><EFBFBD>ֹ<EFBFBD><D6B9><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
2025-06-20 09:25:17 +08:00
/* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 0<><30><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>*/
/* <20>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӹ<EFBFBD><D3B9><EFBFBD><EFBFBD>̺߳<DFB3><CCBA><EFBFBD>*/
void* client_manager_thread(void* arg) {
int index = *(int*)arg;
free(arg);
2025-06-13 11:29:59 +08:00
// <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>״̬Ϊ<CCAC><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
printf("Client Manager Thread %d started\n", index);
2025-06-13 11:29:59 +08:00
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
printf("Started client connections\n");
2025-06-13 11:29:59 +08:00
2025-06-19 13:24:45 +08:00
start_client_connect();
printf("Stopped all client connections\n");
2025-06-13 11:29:59 +08:00
// <20>߳<EFBFBD><DFB3><EFBFBD>ֹ<EFBFBD><D6B9><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Client Manager Thread %d stopped\n", index);
2025-06-13 11:29:59 +08:00
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
2025-06-20 09:25:17 +08:00
/* <20>̹߳<DFB3><CCB9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> 1<><31><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>*/
/* <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>̺߳<DFB3><CCBA><EFBFBD> */
void* message_processor_thread(void* arg) {
int index = *(int*)arg;
free(arg);
// <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>״̬Ϊ<CCAC><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
printf("Message Processor Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
// <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>ѭ<EFBFBD><D1AD>
while (1) {
deal_message_t msg;
if (message_queue.pop(msg)) {
// ʵ<><CAB5><EFBFBD><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߼<EFBFBD>
// ע<><EFBFBD><E2A3BA><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD>msg.client_index<65><78><EFBFBD>ֿͻ<D6BF><CDBB><EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɺ<EFBFBD><C9BA>ͷ<EFBFBD><CDB7>ڴ<EFBFBD>
printf("Processing message from client %d, length: %zu\n",
msg.client_index, msg.length);
// <20><><EFBFBD><EFBFBD>ʵ<EFBFBD>ʵ<EFBFBD><CAB5><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
process_received_message(msg.client_index, msg.data, msg.length);
free(msg.data);
}
}
// <20>߳<EFBFBD><DFB3><EFBFBD>ֹ<EFBFBD><D6B9><EFBFBD><EFBFBD>
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Message Processor Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
2025-06-13 11:29:59 +08:00
/* <20>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
void restart_thread(int index) {
pthread_mutex_lock(&global_lock);
if (thread_info[index].state == THREAD_RESTARTING) {
pthread_mutex_unlock(&global_lock);
return; // <20><><EFBFBD><EFBFBD><EFBFBD>ظ<EFBFBD><D8B8><EFBFBD><EFBFBD><EFBFBD>
}
thread_info[index].state = THREAD_RESTARTING;
printf("Restarting thread %d\n", index);
pthread_mutex_unlock(&global_lock);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
int* new_index = (int*)malloc(sizeof(int));
*new_index = index;
if (index == 0) {
// <20>ͻ<EFBFBD><CDBB>˹<EFBFBD><CBB9><EFBFBD><EFBFBD>߳<EFBFBD>
if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) {
2025-06-13 11:29:59 +08:00
pthread_mutex_lock(&global_lock);
printf("Failed to restart client manager thread %d\n", index);
2025-06-13 11:29:59 +08:00
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
2025-06-20 09:25:17 +08:00
else if (index == 1) {
// <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
2025-06-20 16:20:59 +08:00
else if (index == 2) {
// <20>ӿڣ<D3BF>mq
char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" };
ThreadArgs* args = new ThreadArgs{3, argv};
if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
delete args; // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD>ֶ<EFBFBD><D6B6>ͷ<EFBFBD>
free(new_index);
}
}
2025-06-13 11:29:59 +08:00
else {
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
2025-06-13 11:29:59 +08:00
}
}
/* <20>̴߳<DFB3><CCB4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
int is_thread_alive(pthread_t tid) {
2025-06-20 16:20:59 +08:00
//return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSY<53><59>ʾ<EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
return pthread_kill(tid, 0) == 0; //<2F><><EFBFBD><EFBFBD>
2025-06-13 11:29:59 +08:00
}
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
int main() {
2025-06-13 11:29:59 +08:00
srand(time(NULL)); // <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
// <20><>ʼ<EFBFBD><CABC><EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD><EFBFBD>
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
thread_info[i].index = i;
thread_info[i].state = THREAD_STOPPED;
pthread_mutex_init(&thread_info[i].lock, NULL); // <20><>ʼ<EFBFBD><CABC>ÿ<EFBFBD><C3BF><EFBFBD>̵߳<DFB3><CCB5><EFBFBD>
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʼ<EFBFBD>߳<EFBFBD><DFB3><EFBFBD>
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
int* index = (int*)malloc(sizeof(int));
*index = i;
if (i == 0) {
// <20>ͻ<EFBFBD><CDBB>˹<EFBFBD><CBB9><EFBFBD><EFBFBD>߳<EFBFBD>
if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) {
printf("Failed to create client manager thread %d\n", i);
2025-06-13 11:29:59 +08:00
free(index);
}
}
2025-06-20 09:25:17 +08:00
else if (i == 1) {
// <20><>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) {
printf("Failed to create message processor thread %d\n", i);
free(index);
}
}
2025-06-20 16:20:59 +08:00
else if (i == 2){
//<2F>ӿں<D3BF>mq
char* argv[] = { (char*)index,(char*)"-dcfg_stat_data", (char*)"-s1_1" };
ThreadArgs* args = new ThreadArgs{3, argv};
if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) {
printf("Failed to create message processor thread %d\n", i);
delete args; // <20><><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>û<EFBFBD><C3BB><EFBFBD><EFBFBD><EFBFBD>ɹ<EFBFBD><C9B9><EFBFBD><EFBFBD>ֶ<EFBFBD><D6B6>ͷ<EFBFBD>
free(index);
}
}
2025-06-13 11:29:59 +08:00
else {
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>գ<EFBFBD>ʵ<EFBFBD><CAB5>Ӧ<EFBFBD><D3A6><EFBFBD>п<EFBFBD><D0BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
free(index);
2025-06-13 11:29:59 +08:00
}
2025-06-20 09:25:17 +08:00
2025-06-13 11:29:59 +08:00
}
printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ѭ<EFBFBD><D1AD>
while (1) {
sleep(MONITOR_INTERVAL);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>״̬
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
pthread_mutex_lock(&thread_info[i].lock);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><DFB3>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD>
if (thread_info[i].state == THREAD_RUNNING && !is_thread_alive(thread_info[i].tid)) {
printf("Thread %d crashed unexpectedly\n", i);
thread_info[i].state = THREAD_CRASHED;
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><D2AA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD>
if (thread_info[i].state == THREAD_STOPPED || thread_info[i].state == THREAD_CRASHED) {
pthread_mutex_unlock(&thread_info[i].lock);
restart_thread(i); // <20><EFBFBD><ECB2BD><EFBFBD><EFBFBD>
}
else {
pthread_mutex_unlock(&thread_info[i].lock);
}
}
2025-06-20 09:25:17 +08:00
// <20><><EFBFBD><EFBFBD>socket<65><74><EFBFBD><EFBFBD>״̬
static int queue_monitor = 0;
if (++queue_monitor >= 10) { // ÿ10<31><EFBFBD><EBB1A8>һ<EFBFBD><D2BB>
printf("Message queue size: %zu\n", message_queue.size());
queue_monitor = 0;
}
2025-06-13 11:29:59 +08:00
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Դ(<28><><EFBFBD><EFBFBD><EFBFBD>ϲ<EFBFBD><CFB2><EFBFBD>ִ<EFBFBD>е<EFBFBD><D0B5><EFBFBD><EFBFBD><EFBFBD>)
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
pthread_mutex_destroy(&thread_info[i].lock);
}
return 0;
}