import type { MqttClient, OnMessageCallback, IClientOptions, IClientSubscribeOptions } from 'mqtt'; import mqtt from 'mqtt'; interface MQTTOptions { protocolId?: string; qos?: 0 | 1 | 2; clean?: boolean; connectTimeout?: number; clientId?: string; username?: string; password?: string; reconnectPeriod?: number; // 重连间隔(ms) maxReconnectTimes?: number; // 最大重连次数 } class MQTT { private topic: string; private client: MqttClient | null = null; private isConnected: boolean = false; private reconnectCount: number = 0; private maxReconnectTimes: number; private reconnectPeriod: number; private isManuallyDisconnected: boolean = false; private defaultOptions: MQTTOptions = { protocolId: 'MQTT', qos: 2, clean: true, connectTimeout: 30 * 1000, clientId: `mqttjs_${Math.random().toString(16).substr(2, 8)}`, username: 't_user', password: 'njcnpqs', reconnectPeriod: 1000, // 默认1秒重试一次 maxReconnectTimes: 3, // 默认最大重连5次 }; constructor(topic: string, options: MQTTOptions = {}) { this.topic = topic; this.maxReconnectTimes = options.maxReconnectTimes || this.defaultOptions.maxReconnectTimes!; this.reconnectPeriod = options.reconnectPeriod || this.defaultOptions.reconnectPeriod!; // 合并选项 this.defaultOptions = { ...this.defaultOptions, ...options }; } /** * 初始化 MQTT 客户端 * @returns Promise */ async init(): Promise { if (this.client) { throw new Error('MQTT 客户端已初始化'); } try { const response = await fetch('/'); const mqttUrl = response.headers.get('X-Mqtt-Url') || 'ws://192.168.1.68:8083/mqtt'; this.client = mqtt.connect(mqttUrl, this.defaultOptions as IClientOptions); this.setupEventListeners(); // 等待连接成功或超时 return new Promise((resolve, reject) => { const timeout = setTimeout(() => { if (!this.isConnected) { reject(new Error('MQTT 连接超时')); } }, this.defaultOptions.connectTimeout); this.client?.on('connect', () => { clearTimeout(timeout); this.isConnected = true; this.reconnectCount = 0; // 连接成功重置重连计数 resolve(); }); this.client?.on('error', (error) => { clearTimeout(timeout); console.error('MQTT 连接错误:', error); reject(error); }); }); } catch (error) { console.error('初始化 MQTT 失败:', error); throw error; } } /** * 设置事件监听器 */ private setupEventListeners(): void { if (!this.client) return; this.client.on('close', () => { console.log('MQTT 连接已关闭'); this.isConnected = false; }); this.client.on('offline', () => { console.log('MQTT 客户端离线'); this.isConnected = false; }); this.client.on('reconnect', () => { console.log(`MQTT 正在尝试重连 (${this.reconnectCount + 1}/${this.maxReconnectTimes})...`); // 检查是否超过最大重连次数 if (this.reconnectCount >= this.maxReconnectTimes) { console.log('已达到最大重连次数,停止重连'); this.client?.end(true); this.client = null; return; } this.reconnectCount++; }); } /** * 订阅主题 * @param subscribeOptions 可选的订阅选项 * @returns Promise */ async subscribe(subscribeOptions: IClientSubscribeOptions = {}): Promise { if (!this.client || !this.isConnected) { throw new Error('MQTT 客户端未连接'); } return new Promise((resolve, reject) => { this.client?.subscribe( this.topic, { qos: this.defaultOptions.qos, ...subscribeOptions }, (error) => { if (error) { console.error('订阅失败:', error); reject(error); } else { console.log('订阅成功'); resolve(); } } ); }); } /** * 取消订阅 * @returns Promise */ async unsubscribe(): Promise { if (!this.client || !this.isConnected) { throw new Error('MQTT 客户端未连接'); } return new Promise((resolve, reject) => { this.client?.unsubscribe(this.topic, (error) => { if (error) { console.error('取消订阅失败:', error); reject(error); } else { console.log('取消订阅成功'); resolve(); } }); }); } /** * 设置消息回调 * @param callback 消息回调函数 */ onMessage(callback: OnMessageCallback): void { if (!this.client) { throw new Error('MQTT 客户端未初始化'); } this.client.on('message', callback); } /** * 发布消息 * @param message 要发布的消息 * @param options 发布选项 * @returns Promise */ async publish( message: string | Buffer, options: { qos?: 0 | 1 | 2; retain?: boolean } = {} ): Promise { if (!this.client || !this.isConnected) { throw new Error('MQTT 客户端未连接'); } return new Promise((resolve, reject) => { this.client?.publish( this.topic, message, { qos: this.defaultOptions.qos, ...options }, (error) => { if (error) { console.error('消息发布失败:', error); reject(error); } else { console.log('消息发布成功'); resolve(); } } ); }); } /** * 断开连接 * @param force 是否强制断开 */ disconnect(force: boolean = false): void { this.isManuallyDisconnected = true; if (this.client) { this.client.end(force, () => { console.log('MQTT 连接已断开'); this.isConnected = false; this.client = null; }); } } /** * 检查连接状态 * @returns boolean */ isConnectedToBroker(): boolean { return this.isConnected; } /** * 获取当前重连次数 * @returns number */ getReconnectCount(): number { return this.reconnectCount; } /** * 重置重连计数器 */ resetReconnectCount(): void { this.reconnectCount = 0; } } export default MQTT;