429 lines
14 KiB
C++
429 lines
14 KiB
C++
#include <pthread.h>
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <unistd.h>
|
||
#include <errno.h>
|
||
#include <time.h>
|
||
#include "client2.h"
|
||
|
||
#include "cloudfront/code/interface.h"
|
||
#include <iostream>
|
||
#include <thread>
|
||
#include <chrono>
|
||
|
||
using namespace std;
|
||
#if 0
|
||
/* 常量定义 */
|
||
#define THREAD_CONNECTIONS 10 // 最大线程数
|
||
#define MONITOR_INTERVAL 1 // 监控间隔(秒)
|
||
|
||
/* 线程状态枚举 */
|
||
typedef enum {
|
||
THREAD_RUNNING, // 0:运行中
|
||
THREAD_STOPPED, // 1:正常停止
|
||
THREAD_RESTARTING, // 2:重启中
|
||
THREAD_CRASHED // 3:异常崩溃
|
||
} thread_state_t;
|
||
|
||
/* 线程控制结构体 */
|
||
typedef struct {
|
||
pthread_t tid; // 线程ID
|
||
int index; // 线程编号(0~CONNECTIONS-1)
|
||
thread_state_t state; // 当前状态
|
||
pthread_mutex_t lock; // 线程专用互斥锁
|
||
} thread_info_t;
|
||
#endif
|
||
|
||
extern int INITFLAG;//台账等初始化完成标志
|
||
extern void cleanup_args(ThreadArgs* args);
|
||
|
||
void init_daemon(void)
|
||
{
|
||
int pid;
|
||
int i;
|
||
|
||
if( pid = fork() )
|
||
exit(0); /** 是父进程,结束父进程 */
|
||
else if( pid < 0 )
|
||
exit(1); /** fork失败,退出 */
|
||
|
||
/** 是第一子进程,后台继续执行 */
|
||
|
||
setsid(); /** 第一子进程成为新的会话组长和进程组长并与控制终端分离 */
|
||
|
||
if( pid = fork() )
|
||
exit(0); /** 是第一子进程,结束第一子进程 */
|
||
else if( pid < 0)
|
||
exit(1); /** fork失败,退出 */
|
||
|
||
chdir("/FeProject/bin/"); //multi process running at same time
|
||
umask(0); /** 重设文件创建掩码 */
|
||
|
||
return;
|
||
}
|
||
|
||
/* 全局变量 */
|
||
thread_info_t thread_info[THREAD_CONNECTIONS]; // 线程信息数组
|
||
pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; // 全局互斥锁
|
||
extern SafeMessageQueue message_queue;
|
||
// 生成测试装置
|
||
std::vector<DeviceInfo> generate_test_devices(int count) {
|
||
std::vector<DeviceInfo> devices;
|
||
|
||
for (int i = 1; i <= count; ++i) {
|
||
// 生成装置ID和名称
|
||
std::string dev_id = "D" + std::to_string(1000 + i).substr(1); // D001, D002, ..., D100
|
||
std::string dev_name = "Device " + std::to_string(i);
|
||
|
||
// 生成测点
|
||
std::vector<PointInfo> points = {
|
||
{
|
||
"P" + dev_id.substr(1) + "01", // 测点ID如 P00101
|
||
"Voltage " + dev_name,
|
||
dev_id,
|
||
1,
|
||
0.0, // 随机电压值
|
||
0.0,
|
||
100.0,
|
||
80.0
|
||
},
|
||
{
|
||
"P" + dev_id.substr(1) + "02", // 测点ID如 P00102
|
||
"Current " + dev_name,
|
||
dev_id,
|
||
2,
|
||
0.0, // 随机电流值
|
||
0.0,
|
||
20.0,
|
||
15.0
|
||
}
|
||
};
|
||
|
||
// 添加装置
|
||
devices.push_back({
|
||
dev_id,
|
||
dev_name,
|
||
(i % 2 == 0) ? "Model-X" : "Model-Y", // 交替使用两种型号
|
||
"00-B7-8D-A8-00-D6", // 随机MAC地址
|
||
1, // 状态 (1=在线)
|
||
points
|
||
});
|
||
}
|
||
|
||
return devices;
|
||
}
|
||
|
||
void PrintDevices(const std::vector<DeviceInfo>& devices) {
|
||
std::cout << "==== Devices List (" << devices.size() << ") ====\n";
|
||
for (const auto& dev : devices) {
|
||
std::cout << "Device ID : " << dev.device_id << "\n";
|
||
std::cout << "Name : " << dev.name << "\n";
|
||
std::cout << "Model : " << dev.model << "\n";
|
||
std::cout << "MAC : " << dev.mac << "\n";
|
||
std::cout << "Status : " << dev.status << "\n";
|
||
std::cout << "Points (" << dev.points.size() << "):\n";
|
||
for (const auto& pt : dev.points) {
|
||
std::cout << " Point ID : " << pt.point_id << "\n";
|
||
std::cout << " Name : " << pt.name << "\n";
|
||
std::cout << " Device ID : " << pt.device_id << "\n";
|
||
std::cout << " Cpu No : " << pt.nCpuNo << "\n";
|
||
std::cout << " PT1 : " << pt.PT1 << "\n";
|
||
std::cout << " PT2 : " << pt.PT2 << "\n";
|
||
std::cout << " CT1 : " << pt.CT1 << "\n";
|
||
std::cout << " CT2 : " << pt.CT2 << "\n";
|
||
std::cout << " Scale : " << pt.strScale << "\n";
|
||
std::cout << " PTType : " << pt.nPTType << "\n";
|
||
std::cout << "----------------------\n";
|
||
}
|
||
std::cout << "==========================\n";
|
||
}
|
||
}
|
||
|
||
/* 线程工作函数 0号子线程*/
|
||
/* 客户端连接管理线程函数*/
|
||
void* client_manager_thread(void* arg) {
|
||
int index = *(int*)arg;
|
||
free(arg);
|
||
|
||
// 更新线程状态为运行中
|
||
pthread_mutex_lock(&thread_info[index].lock);
|
||
printf("Client Manager Thread %d started\n", index);
|
||
thread_info[index].state = THREAD_RUNNING;
|
||
pthread_mutex_unlock(&thread_info[index].lock);
|
||
|
||
printf("Started client connections\n");
|
||
|
||
// 创建测点数据
|
||
std::vector<PointInfo> points1 = {
|
||
{"P001", "Main Voltage", "D001",1 ,1, 1, 1, 1,"0.38k",0},
|
||
{"P002", "Backup Voltage", "D001",2 ,1, 1, 1, 1,"0.38k",0}
|
||
};
|
||
//00B78DA800D6 00-B7-8D-01-79-06
|
||
// 创建装置列表
|
||
std::vector<DeviceInfo> devices = {
|
||
{
|
||
"D001", "Primary Device", "Model-X", "00-B7-8D-01-79-06",
|
||
1, points1,true
|
||
}
|
||
};
|
||
|
||
// 生成100个测试装置
|
||
//std::vector<DeviceInfo> test_devices = generate_test_devices(100);
|
||
|
||
//lnk从台账读取设备
|
||
//std::vector<DeviceInfo> devices = GenerateDeviceInfoFromLedger(terminal_devlist);//lnk添加
|
||
|
||
//台账打印
|
||
//PrintDevices(devices);
|
||
|
||
// 启动客户端连接
|
||
start_client_connect(devices);
|
||
|
||
printf("Stopped all client connections\n");
|
||
|
||
// 线程终止处理
|
||
pthread_mutex_lock(&thread_info[index].lock);
|
||
thread_info[index].state = THREAD_STOPPED;
|
||
printf("Client Manager Thread %d stopped\n", index);
|
||
pthread_mutex_unlock(&thread_info[index].lock);
|
||
return NULL;
|
||
}
|
||
/* 线程工作函数 1号子线程*/
|
||
/* 消息处理线程函数 */
|
||
void* message_processor_thread(void* arg) {
|
||
int index = *(int*)arg;
|
||
free(arg);
|
||
|
||
// 更新线程状态为运行中
|
||
pthread_mutex_lock(&thread_info[index].lock);
|
||
printf("Message Processor Thread %d started\n", index);
|
||
thread_info[index].state = THREAD_RUNNING;
|
||
pthread_mutex_unlock(&thread_info[index].lock);
|
||
|
||
// 消息处理循环
|
||
while (1) {
|
||
deal_message_t msg;
|
||
if (message_queue.pop(msg)) {
|
||
// 实际消息处理逻辑
|
||
// 注意:这里需要根据msg.client_index区分客户端
|
||
// 处理完成后释放内存
|
||
|
||
// 调用实际的消息处理函数
|
||
process_received_message(msg.mac, msg.device_id, msg.data, msg.length);
|
||
|
||
free(msg.data);
|
||
}
|
||
else {
|
||
// 队列为空时短暂休眠(100微秒 = 0.1毫秒)
|
||
usleep(100);
|
||
}
|
||
}
|
||
|
||
// 线程终止处理
|
||
pthread_mutex_lock(&thread_info[index].lock);
|
||
thread_info[index].state = THREAD_STOPPED;
|
||
printf("Message Processor Thread %d stopped\n", index);
|
||
pthread_mutex_unlock(&thread_info[index].lock);
|
||
return NULL;
|
||
}
|
||
/* 线程重启函数 */
|
||
void restart_thread(int index) {
|
||
pthread_mutex_lock(&global_lock);
|
||
if (thread_info[index].state == THREAD_RESTARTING) {
|
||
pthread_mutex_unlock(&global_lock);
|
||
return; // 避免重复重启
|
||
}
|
||
|
||
thread_info[index].state = THREAD_RESTARTING;
|
||
printf("Restarting thread %d\n", index);
|
||
pthread_mutex_unlock(&global_lock);
|
||
|
||
// 创建新线程
|
||
int* new_index = (int*)malloc(sizeof(int));
|
||
*new_index = index;
|
||
|
||
if (index == 1) {
|
||
// 客户端管理线程
|
||
if (pthread_create(&thread_info[index].tid, NULL, client_manager_thread, new_index) != 0) {
|
||
pthread_mutex_lock(&global_lock);
|
||
printf("Failed to restart client manager thread %d\n", index);
|
||
thread_info[index].state = THREAD_CRASHED;
|
||
pthread_mutex_unlock(&global_lock);
|
||
free(new_index);
|
||
}
|
||
}
|
||
else if (index == 2) {
|
||
// 消息处理线程
|
||
if (pthread_create(&thread_info[index].tid, NULL, message_processor_thread, new_index) != 0) {
|
||
pthread_mutex_lock(&global_lock);
|
||
printf("Failed to restart message processor thread %d\n", index);
|
||
thread_info[index].state = THREAD_CRASHED;
|
||
pthread_mutex_unlock(&global_lock);
|
||
free(new_index);
|
||
}
|
||
}
|
||
else if (index == 0) {
|
||
// 接口,mq
|
||
char* argv[] = { (char*)new_index };//这里需要构造进程号参数传入
|
||
ThreadArgs* args = new ThreadArgs{1, argv};
|
||
if (pthread_create(&thread_info[index].tid, NULL, cloudfrontthread, args) != 0) {
|
||
pthread_mutex_lock(&global_lock);
|
||
printf("Failed to restart message processor thread %d\n", index);
|
||
thread_info[index].state = THREAD_CRASHED;
|
||
pthread_mutex_unlock(&global_lock);
|
||
delete args; // 如果线程没创建成功就手动释放
|
||
free(new_index);
|
||
}
|
||
}
|
||
else {
|
||
// 其他工作线程
|
||
// 这里简化为空,实际应用中可添加其他线程
|
||
}
|
||
}
|
||
|
||
/* 线程存活检测 */
|
||
int is_thread_alive(pthread_t tid) {
|
||
return pthread_tryjoin_np(tid, NULL) == EBUSY; // EBUSY表示线程仍在运行
|
||
}
|
||
|
||
//lnk参数
|
||
ThreadArgs* make_thread_args_from_strs(const std::vector<std::string>& args_vec) {
|
||
char** argv = new char*[args_vec.size() + 1]; // 多一个 nullptr 结尾
|
||
for (size_t i = 0; i < args_vec.size(); ++i) {
|
||
argv[i] = strdup(args_vec[i].c_str()); // strdup 是 malloc 出来的
|
||
}
|
||
argv[args_vec.size()] = nullptr;
|
||
return new ThreadArgs{static_cast<int>(args_vec.size()), argv};
|
||
}
|
||
|
||
/* 主函数 */
|
||
int main(int argc ,char** argv) {//多进程添加参数
|
||
if(!parse_param(argc,argv)){
|
||
std::cerr << "process param error,exit" << std::endl;
|
||
return 1;
|
||
}
|
||
//init_daemon();
|
||
srand(time(NULL)); // 初始化随机数种子
|
||
|
||
// 初始化线程数组
|
||
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
|
||
thread_info[i].index = i;
|
||
thread_info[i].state = THREAD_STOPPED;
|
||
pthread_mutex_init(&thread_info[i].lock, NULL); // 初始化每个线程的锁
|
||
}
|
||
|
||
//接口和mq
|
||
ThreadArgs* args = make_thread_args_from_strs({ "0" });
|
||
if (pthread_create(&thread_info[0].tid, NULL, cloudfrontthread, args) != 0) {
|
||
printf("Failed to create message processor thread 0\n");
|
||
cleanup_args(args);
|
||
}
|
||
|
||
/*while(!INITFLAG){
|
||
std::this_thread::sleep_for(std::chrono::seconds(3));
|
||
std::cout << "waiting cloudfront initialize ..." << std::endl;
|
||
}*/
|
||
|
||
// 创建初始线程组
|
||
for (int i = 1; i < THREAD_CONNECTIONS; i++) {
|
||
int* index = (int*)malloc(sizeof(int));
|
||
*index = i;
|
||
|
||
if (i == 1) {
|
||
// 客户端管理线程
|
||
if (pthread_create(&thread_info[i].tid, NULL, client_manager_thread, index) != 0) {
|
||
printf("Failed to create client manager thread %d\n", i);
|
||
free(index);
|
||
}
|
||
}
|
||
else if (i == 2) {
|
||
// 消息处理线程
|
||
if (pthread_create(&thread_info[i].tid, NULL, message_processor_thread, index) != 0) {
|
||
printf("Failed to create message processor thread %d\n", i);
|
||
free(index);
|
||
}
|
||
}
|
||
else if (i == 3){
|
||
/*//接口和mq
|
||
char* argv[] = { (char*)index };//这里需要构造进程号参数传入
|
||
ThreadArgs* args = new ThreadArgs{1, argv};
|
||
if (pthread_create(&thread_info[i].tid, NULL, cloudfrontthread, args) != 0) {
|
||
printf("Failed to create message processor thread %d\n", i);
|
||
delete args; // 如果线程没创建成功就手动释放
|
||
free(index);
|
||
}*/
|
||
}
|
||
else {
|
||
// 其他工作线程
|
||
// 这里简化为空,实际应用中可添加其他线程
|
||
free(index);
|
||
}
|
||
|
||
}
|
||
|
||
printf("Thread monitoring system started with %d workers\n", THREAD_CONNECTIONS);
|
||
|
||
// 主监控循环
|
||
while (1) {
|
||
sleep(MONITOR_INTERVAL);
|
||
|
||
// 检查所有线程状态
|
||
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
|
||
pthread_mutex_lock(&thread_info[i].lock);
|
||
|
||
// 检测运行中线程是否崩溃
|
||
if (thread_info[i].state == THREAD_RUNNING && !is_thread_alive(thread_info[i].tid)) {
|
||
printf("Thread %d crashed unexpectedly\n", i);
|
||
thread_info[i].state = THREAD_CRASHED;
|
||
}
|
||
|
||
// 处理需要重启的线程
|
||
if (thread_info[i].state == THREAD_STOPPED || thread_info[i].state == THREAD_CRASHED) {
|
||
pthread_mutex_unlock(&thread_info[i].lock);
|
||
restart_thread(i); // 异步重启
|
||
}
|
||
else {
|
||
pthread_mutex_unlock(&thread_info[i].lock);
|
||
}
|
||
}
|
||
|
||
// 创建测点数据
|
||
std::vector<PointInfo> points2 = {
|
||
{"P101", "Generator Output", "D002",1 ,1, 1, 1, 1,"0.38k",0}
|
||
};
|
||
//00B78DA800D6 00-B7-8D-01-79-06
|
||
// 创建装置列表
|
||
std::vector<DeviceInfo> devices = {
|
||
{
|
||
"D002", "Backup Device", "Model-Y", "00-B7-8D-A8-00-D6",
|
||
1, points2,true
|
||
}
|
||
};
|
||
|
||
// 监控socket队列状态
|
||
static int queue_monitor = 0;
|
||
//static int count = 3;
|
||
if (++queue_monitor >= 20) { // 尝试添加一个设备
|
||
printf("Message queue size: %zu\n", message_queue.size());
|
||
queue_monitor = 0;
|
||
|
||
for (const auto& device : devices) {
|
||
//ClientManager::instance().add_device(device);
|
||
}
|
||
|
||
/*std::vector<DeviceInfo> test_devices = generate_test_devices(count);
|
||
count++;
|
||
|
||
for (const auto& device : test_devices) {
|
||
ClientManager::instance().remove_device("D001");
|
||
}*/
|
||
}
|
||
}
|
||
|
||
// 清理资源(理论上不会执行到这里)
|
||
for (int i = 0; i < THREAD_CONNECTIONS; i++) {
|
||
pthread_mutex_destroy(&thread_info[i].lock);
|
||
}
|
||
return 0;
|
||
}
|