Files
front_linux/LFtid1056/main_thread.cpp
2025-12-08 15:34:46 +08:00

534 lines
18 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include "client2.h"
#include "cloudfront/code/interface.h"
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
#if 0
/* 常量定义 */
#define THREAD_CONNECTIONS 10 // 最大线程数
#define MONITOR_INTERVAL 1 // 监控间隔(秒)
/* 线程状态枚举 */
typedef enum {
THREAD_RUNNING, // 0:运行中
THREAD_STOPPED, // 1:正常停止
THREAD_RESTARTING, // 2:重启中
THREAD_CRASHED // 3:异常崩溃
} thread_state_t;
/* 线程控制结构体 */
typedef struct {
pthread_t tid; // 线程ID
int index; // 线程编号(0~CONNECTIONS-1)
thread_state_t state; // 当前状态
pthread_mutex_t lock; // 线程专用互斥锁
} thread_info_t;
#endif
extern int INITFLAG;//台账等初始化完成标志
extern void cleanup_args(ThreadArgs* args);
void init_daemon(void)
{
int pid;
int i;
if( pid = fork() )
exit(0); /** 是父进程,结束父进程 */
else if( pid < 0 )
exit(1); /** fork失败退出 */
/** 是第一子进程,后台继续执行 */
setsid(); /** 第一子进程成为新的会话组长和进程组长并与控制终端分离 */
if( pid = fork() )
exit(0); /** 是第一子进程,结束第一子进程 */
else if( pid < 0)
exit(1); /** fork失败退出 */
chdir("/FeProject/bin/"); //multi process running at same time
umask(0); /** 重设文件创建掩码 */
return;
}
/* 全局变量 */
thread_info_t thread_info[THREAD_CONNECTIONS]; // 线程信息数组
pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // 全局互斥锁
extern SafeMessageQueue message_queue;
// 生成测试装置
std::vector<DeviceInfo> generate_test_devices(int count) {
std::vector<DeviceInfo> devices;
for (int i = 1; i <= count; ++i) {
// 生成装置ID和名称
std::string dev_id = "D" + std::to_string(1000 + i).substr(1); // D001, D002, ..., D100
std::string dev_name = "Device " + std::to_string(i);
// 生成测点
std::vector<PointInfo> points = {
{
"P" + dev_id.substr(1) + "01", // 测点ID如 P00101
"Voltage " + dev_name,
dev_id,
1,
0.0, // 随机电压值
0.0,
100.0,
80.0
},
{
"P" + dev_id.substr(1) + "02", // 测点ID如 P00102
"Current " + dev_name,
dev_id,
2,
0.0, // 随机电流值
0.0,
20.0,
15.0
}
};
// 添加装置
devices.push_back({
dev_id,
dev_name,
(i % 2 == 0) ? "Model-X" : "Model-Y", // 交替使用两种型号
"00-B7-8D-A8-00-D6", // 随机MAC地址
1, // 状态 (1=在线)
points
});
}
return devices;
}
void PrintDevices(const std::vector<DeviceInfo>& devices) {
std::cout << "==== Devices List (" << devices.size() << ") ====\n";
for (const auto& dev : devices) {
std::cout << "Device ID : " << dev.device_id << "\n";
std::cout << "Name : " << dev.name << "\n";
std::cout << "Model : " << dev.model << "\n";
std::cout << "MAC : " << dev.mac << "\n";
std::cout << "Status : " << dev.status << "\n";
std::cout << "Points (" << dev.points.size() << "):\n";
for (const auto& pt : dev.points) {
std::cout << " Point ID : " << pt.point_id << "\n";
std::cout << " Name : " << pt.name << "\n";
std::cout << " Device ID : " << pt.device_id << "\n";
std::cout << " Cpu No : " << pt.nCpuNo << "\n";
std::cout << " PT1 : " << pt.PT1 << "\n";
std::cout << " PT2 : " << pt.PT2 << "\n";
std::cout << " CT1 : " << pt.CT1 << "\n";
std::cout << " CT2 : " << pt.CT2 << "\n";
std::cout << " Scale : " << pt.strScale << "\n";
std::cout << " PTType : " << pt.nPTType << "\n";
std::cout << "----------------------\n";
}
std::cout << "==========================\n";
}
}
/* 线程工作函数 0号子线程*/
/* 客户端连接管理线程函数*/
void* client_manager_thread(void* arg) {
int index = *(int*)arg;
free(arg);
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
printf("Client Manager Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
printf("Started client connections\n");
// 创建测点数据
//std::vector<PointInfo> points1 = {
// {"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1,"0.38k",0},
// {"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1,"0.38k",0}
//};
//std::vector<PointInfo> points2 = {
// {"P003", "Main Voltage", "D002",1 ,1, 1, 1, 1,"0.38k",0},
// {"P004", "Backup Voltage", "D002",2 ,1, 1, 1, 1,"0.38k",0}
//};
////00B78DA800D6 00-B7-8D-01-79-06 00-B7-8D-A8-00-D6 00-B7-8D-01-71-09 00-B7-8D-01-88-7f
//// 创建装置列表
//std::vector<DeviceInfo> devices = {
// {
// "D001", "Primary Device", "Model-X", "00-B7-8D-00-BB-03",
// 1, points1,true
// }
//};
// 生成100个测试装置
//std::vector<DeviceInfo> test_devices = generate_test_devices(100);
//lnk从台账读取设备
std::vector<DeviceInfo> devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk添加
//台账打印
PrintDevices(devices);
// 启动客户端连接
start_client_connect(devices);
printf("Stopped all client connections\n");
// 线程终止处理
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Client Manager Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
/* 线程工作函数 1号子线程*/
/* 消息处理线程函数 */
void* message_processor_thread(void* arg) {
int index = *(int*)arg;
free(arg);
// 更新线程状态为运行中
pthread_mutex_lock(&thread_info[index].lock);
printf("Message Processor Thread %d started\n", index);
thread_info[index].state = THREAD_RUNNING;
pthread_mutex_unlock(&thread_info[index].lock);
// 消息处理循环
while (1) {
deal_message_t msg;
if (message_queue.pop(msg)) {
// 实际消息处理逻辑
// 注意这里需要根据msg.client_index区分客户端
// 处理完成后释放内存
// 调用实际的消息处理函数
process_received_message(msg.mac, msg.device_id, msg.data, msg.length);
free(msg.data);
}
else {
// 队列为空时短暂休眠100微秒 = 0.1毫秒)
usleep(100);
}
}
// 线程终止处理
pthread_mutex_lock(&thread_info[index].lock);
thread_info[index].state = THREAD_STOPPED;
printf("Message Processor Thread %d stopped\n", index);
pthread_mutex_unlock(&thread_info[index].lock);
return NULL;
}
/* 线程重启函数 */
void restart_thread(int index) {
//lnk20251208
pthread_t old_tid = 0;
pthread_mutex_lock(&global_lock);
if (thread_info[index].state == THREAD_RESTARTING) {
pthread_mutex_unlock(&global_lock);
return; // 避免重复重启
}
// 如果之前是 STOPPED并且 tid 非空可以回收一下旧线程防止资源泄露lnk20251208
if (thread_info[index].state == THREAD_STOPPED && thread_info[index].tid) {
old_tid = thread_info[index].tid;
thread_info[index].tid = 0;
}
thread_info[index].state = THREAD_RESTARTING;
printf("Restarting thread %d\n", index);
pthread_mutex_unlock(&global_lock);
// 在锁外 join避免阻塞其他线程
if (old_tid) {
pthread_join(old_tid, NULL);
}
// ========== 创建新线程 ==========lnk20251208
if (index == 0) {
// 接口 + MQcloudfrontthread 不再需要参数,直接传 NULL
if (pthread_create(&thread_info[index].tid, NULL,
cloudfrontthread, NULL) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart cloudfrontthread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
}
} else if (index == 1) {
// 客户端管理线程
int* new_index = (int*)malloc(sizeof(int));
if (!new_index) {
pthread_mutex_lock(&global_lock);
printf("Failed to malloc for client manager thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
return;
}
*new_index = index;
if (pthread_create(&thread_info[index].tid, NULL,
client_manager_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart client manager thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index); // 失败才自己 free成功时在线程里 free
}
} else if (index == 2) {
// 消息处理线程
int* new_index = (int*)malloc(sizeof(int));
if (!new_index) {
pthread_mutex_lock(&global_lock);
printf("Failed to malloc for message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
return;
}
*new_index = index;
if (pthread_create(&thread_info[index].tid, NULL,
message_processor_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
} else {
// 其他工作线程暂不重启
pthread_mutex_lock(&global_lock);
printf("Thread %d is not configured for restart\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
}
/*// 创建新线程
int* new_index = (int*)malloc(sizeof(int));
*new_index = index;
if (index == 1) {
// 客户端管理线程
if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart client manager thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
else if (index == 2) {
// 消息处理线程
if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
free(new_index);
}
}
else if (index == 0) {
// 接口mq
//char* argv[] = { (char*)new_index };//这里需要构造进程号参数传入
// ThreadArgs* args = new ThreadArgs{1, argv};
if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, new_index) != 0) {
pthread_mutex_lock(&global_lock);
printf("Failed to restart message processor thread %d\n", index);
thread_info[index].state = THREAD_CRASHED;
pthread_mutex_unlock(&global_lock);
//delete args; // 如果线程没创建成功就手动释放
free(new_index);
}
}
else {
// 其他工作线程
// 这里简化为空,实际应用中可添加其他线程
}*/
}
/* 线程存活检测 */
int is_thread_alive(pthread_t tid) {
return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSY表示线程仍在运行
}
//lnk参数
/*ThreadArgs* make_thread_args_from_strs(const std::vector<std::string>& args_vec) {
char** argv = new char*[args_vec.size() + 1]; // 多一个 nullptr 结尾
for (size_t i = 0; i < args_vec.size(); ++i) {
argv[i] = strdup(args_vec[i].c_str()); // strdup 是 malloc 出来的
}
argv[args_vec.size()] = nullptr;
return new ThreadArgs{static_cast<int>(args_vec.size()), argv};
}*/
/* 主函数 */
int main(int argc ,char** argv) {//多进程添加参数
if(!parse_param(argc,argv)){
std::cerr << "process param error,exit" << std::endl;
return 1;
}
init_daemon();
srand(time(NULL)); // 初始化随机数种子
// 初始化线程数组
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
thread_info[i].index = i;
thread_info[i].state = THREAD_STOPPED;
pthread_mutex_init(&thread_info[i].lock, NULL); // 初始化每个线程的锁
}
//接口和mq
//ThreadArgs* args = make_thread_args_from_strs({ "0" });
if (pthread_create(&thread_info[0].tid, NULL, cloudfrontthread, NULL) != 0) {
printf("Failed to create message processor thread 0\n");
//cleanup_args(args);
}
while(!INITFLAG){
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "waiting cloudfront initialize ..." << std::endl;
}
// 创建初始线程组
for (int i = 1; i < THREAD_CONNECTIONS; i++) {
int* index = (int*)malloc(sizeof(int));
*index = i;
if (i == 1) {
// 客户端管理线程
if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) {
printf("Failed to create client manager thread %d\n", i);
free(index);
}
}
else if (i == 2) {
// 消息处理线程
if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) {
printf("Failed to create message processor thread %d\n", i);
free(index);
}
}
else if (i == 3){
/*//接口和mq
char* argv[] = { (char*)index };//这里需要构造进程号参数传入
ThreadArgs* args = new ThreadArgs{1, argv};
if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) {
printf("Failed to create message processor thread %d\n", i);
delete args; // 如果线程没创建成功就手动释放
free(index);
}*/
}
else {
// 其他工作线程
// 这里简化为空,实际应用中可添加其他线程
free(index);
}
}
printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS);
// 主监控循环
while (1) {
sleep(MONITOR_INTERVAL);
// 检查所有线程状态
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
pthread_mutex_lock(&thread_info[i].lock);
// 检测运行中线程是否崩溃
if (thread_info[i].state == THREAD_RUNNING && !is_thread_alive(thread_info[i].tid)) {
printf("Thread %d crashed unexpectedly\n", i);
thread_info[i].state = THREAD_CRASHED;
}
// 处理需要重启的线程
if (thread_info[i].state == THREAD_STOPPED || thread_info[i].state == THREAD_CRASHED) {
pthread_mutex_unlock(&thread_info[i].lock);
restart_thread(i); // 异步重启
}
else {
pthread_mutex_unlock(&thread_info[i].lock);
}
}
/*// 创建测点数据
std::vector<PointInfo> points2 = {
{"P101", "Generator Output", "D002",1 ,1, 1, 1, 1,"0.38k",0}
};
//00B78DA800D6 00-B7-8D-01-79-06 00-B7-8D-A8-00-D6
// 创建装置列表
std::vector<DeviceInfo> devices = {
{
"D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6",
1, points2,true
}
};*/
// 创建测点数据
//std::vector<PointInfo> points1 = {
// {"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1,"0.38k",0},
// {"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1,"0.38k",0}
//};
//std::vector<PointInfo> points2 = {
// {"P003", "Main Voltage", "D002",1 ,1, 1, 1, 1,"0.38k",0},
// {"P004", "Backup Voltage", "D002",2 ,1, 1, 1, 1,"0.38k",0}
//};
////00B78DA800D6 00-B7-8D-01-79-06 00-B7-8D-A8-00-D6 00-B7-8D-01-71-09 00-B7-8D-01-88-7f
//// 创建装置列表
//std::vector<DeviceInfo> devices = {
// {
// "D001", "Primary Device", "Model-X", "00-B7-8D-01-88-7f",
// 1, points1,true
// },
// {
// "D002", "Primary Device1", "Model-X1", "00-B7-8D-01-71-09",
// 1, points2,true
// }
//};
// 监控socket队列状态
static int queue_monitor = 0;
static bool flag = false;
//static int count = 3;
if (++queue_monitor >= 60) { // 尝试添加一个设备
printf("Message queue size: %zu\n", message_queue.size());
queue_monitor = 0;
/*if (flag) {
flag = false;
for (const auto& device : devices) {
ClientManager::instance().add_device(device);
}
}
else {
flag = true;
ClientManager::instance().remove_device("D001");
ClientManager::instance().remove_device("D002");
}*/
}
}
// 清理资源(理论上不会执行到这里)
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
pthread_mutex_destroy(&thread_info[i].lock);
}
return 0;
}