Files
front_linux/LFtid1056/main_thread.cpp

322 lines
9.6 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include "client2.h"
#include "cloudfront/code/interface.h"
using namespace std;
#if 0
/* 常量定义 */
#define THREAD_CONNECTIONS 10 // 最大线程数
#define MONITOR_INTERVAL 1 // 监控间隔(秒)
/* 线程状态枚举 */
typedef enum {
THREAD_RUNNING, // 0:运行中
THREAD_STOPPED, // 1:正常停止
THREAD_RESTARTING, // 2:重启中
THREAD_CRASHED // 3:异常崩溃
} thread_state_t;
/* 线程控制结构体 */
typedef struct {
pthread_t tid; // 线程ID
int index; // 线程编号(0~CONNECTIONS-1)
thread_state_t state; // 当前状态
pthread_mutex_t lock; // 线程专用互斥锁
} thread_info_t;
#endif
/* 全局变量 */
thread_info_t thread_info[THREAD_CONNECTIONS]; // 线程信息数组
pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // 全局互斥锁
extern SafeMessageQueue message_queue;
// 生成测试装置
std::vector<DeviceInfo> generate_test_devices(int count) {
std::vector<DeviceInfo> devices;
for (int i = 1; i <= count; ++i) {
// 生成装置ID和名称
std::string dev_id = "D" + std::to_string(1000 + i).substr(1); // D001, D002, ..., D100
std::string dev_name = "Device " + std::to_string(i);
// 生成测点
std::vector<PointInfo> points = {
{
"P" + dev_id.substr(1) + "01", // 测点ID如 P00101
"Voltage " + dev_name,
dev_id,
1,
0.0, // 随机电压值
0.0,
100.0,
80.0
},
{
"P" + dev_id.substr(1) + "02", // 测点ID如 P00102
"Current " + dev_name,
dev_id,
2,
0.0, // 随机电流值
0.0,
20.0,
15.0
}
};
// 添加装置
devices.push_back({
dev_id,
dev_name,
(i % 2 == 0) ? "Model-X" : "Model-Y", // 交替使用两种型号
"00-B7-8D-A8-00-D6", // 随机MAC地址
1, // 状态 (1=在线)
points
});
}
return devices;
}
/* 线程工作函数 0号子线程*/
/* 客户端连接管理线程函数*/
void* client_manager_thread(void* arg) {
int index = *(int*)arg;
free(arg);
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
printf("Client Manager Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
printf("Started client connections\n");
// 创建测点数据
std::vector<PointInfo> points1 = {
{"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1},
{"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1}
};
std::vector<PointInfo> points2 = {
{"P101", "Generator Output", "D002",1 ,1, 1, 1, 1}
};
//00-B7-8D-A8-00-D6
// 创建装置列表
std::vector<DeviceInfo> devices = {
{
"D001", "Primary Device", "Model-X", "00-B7-8D-A8-00-D9",
1, points1
},
{
"D002", "Backup Device", "Model-Y", "00-B7-8D-01-79-06",
1, points2
}
};
// 生成100个测试装置
std::vector<DeviceInfo> test_devices = generate_test_devices(100);
// 启动客户端连接
start_client_connect(devices);
printf("Stopped all client connections\n");
// 线程终止处理
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Client Manager Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
/* 线程工作函数 1号子线程*/
/* 消息处理线程函数 */
void* message_processor_thread(void* arg) {
int index = *(int*)arg;
free(arg);
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
printf("Message Processor Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
// 消息处理循环
while (1) {
deal_message_t msg;
if (message_queue.pop(msg)) {
// 实际消息处理逻辑
// 注意这里需要根据msg.client_index区分客户端
// 处理完成后释放内存
// 调用实际的消息处理函数
process_received_message(msg.mac, msg.device_id, msg.data, msg.length);
free(msg.data);
}
else {
// 队列为空时短暂休眠100微秒 = 0.1毫秒)
usleep(100);
}
}
// 线程终止处理
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Message Processor Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
/* 线程重启函数 */
void restart_thread(int index) {
pthread_mutex_lock(&global_lock);
if (thread_info[index].state == THREAD_RESTARTING) {
pthread_mutex_unlock(&global_lock);
return; // 避免重复重启
}
thread_info[index].state = THREAD_RESTARTING;
printf("Restarting thread %d\n", index);
pthread_mutex_unlock(&global_lock);
// 创建新线程
int* new_index = (int*)malloc(sizeof(int));
*new_index = index;
if (index == 0) {
// 客户端管理线程
if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart client manager thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
else if (index == 1) {
// 消息处理线程
if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
else if (false) {
// 接口mq
char* argv[] = { (char*)new_index ,(char*)"-dcfg_stat_data", (char*)"-s1_1" };
ThreadArgs* args = new ThreadArgs{3, argv};
if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
delete args; // 如果线程没创建成功就手动释放
free(new_index);
}
}
else {
// 其他工作线程
// 这里简化为空,实际应用中可添加其他线程
}
}
/* 线程存活检测 */
int is_thread_alive(pthread_t tid) {
return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSY表示线程仍在运行
}
/* 主函数 */
int main() {
srand(time(NULL)); // 初始化随机数种子
// 初始化线程数组
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
thread_info[i].index = i;
thread_info[i].state = THREAD_STOPPED;
pthread_mutex_init(&thread_info[i].lock, NULL); // 初始化每个线程的锁
}
// 创建初始线程组
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
int* index = (int*)malloc(sizeof(int));
*index = i;
if (i == 0) {
// 客户端管理线程
if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) {
printf("Failed to create client manager thread %d\n", i);
free(index);
}
}
else if (i == 1) {
// 消息处理线程
if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) {
printf("Failed to create message processor thread %d\n", i);
free(index);
}
}
else {
// 其他工作线程
// 这里简化为空,实际应用中可添加其他线程
free(index);
}
}
printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS);
// 主监控循环
while (1) {
sleep(MONITOR_INTERVAL);
// 检查所有线程状态
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
pthread_mutex_lock(&thread_info[i].lock);
// 检测运行中线程是否崩溃
if (thread_info[i].state == THREAD_RUNNING && !is_thread_alive(thread_info[i].tid)) {
printf("Thread %d crashed unexpectedly\n", i);
thread_info[i].state = THREAD_CRASHED;
}
// 处理需要重启的线程
if (thread_info[i].state == THREAD_STOPPED || thread_info[i].state == THREAD_CRASHED) {
pthread_mutex_unlock(&thread_info[i].lock);
restart_thread(i); // 异步重启
}
else {
pthread_mutex_unlock(&thread_info[i].lock);
}
}
// 监控socket队列状态
static int queue_monitor = 0;
//static int count = 3;
if (++queue_monitor >= 10) { // 每10秒报告一次
printf("Message queue size: %zu\n", message_queue.size());
queue_monitor = 0;
/*std::vector<DeviceInfo> test_devices = generate_test_devices(count);
count++;
for (const auto& device : test_devices) {
ClientManager::instance().add_device(device);
}
for (const auto& device : test_devices) {
ClientManager::instance().remove_device("D001");
}*/
}
}
// 清理资源(理论上不会执行到这里)
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
pthread_mutex_destroy(&thread_info[i].lock);
}
return 0;
}