import type { MqttClient, OnMessageCallback, IClientOptions, IClientSubscribeOptions } from 'mqtt' import mqtt from 'mqtt' interface MQTTOptions { protocolId?: string qos?: 0 | 1 | 2 clean?: boolean connectTimeout?: number clientId?: string username?: string password?: string reconnectPeriod?: number // 重连间隔(ms) maxReconnectTimes?: number // 最大重连次数 } class MQTT { private topic: string private client: MqttClient | null = null private isConnected: boolean = false private reconnectCount: number = 0 private maxReconnectTimes: number private reconnectPeriod: number private isManuallyDisconnected: boolean = false private defaultOptions: MQTTOptions = { protocolId: 'MQTT', qos: 1, clean: true, connectTimeout: 30 * 1000, clientId: `mqttjs_${Math.random().toString(16).substr(2, 8)}`, username: 't_user', password: 'njcnpqs', reconnectPeriod: 1000, // 默认1秒重试一次 maxReconnectTimes: 3 // 默认最大重连5次 } constructor(topic: string, options: MQTTOptions = {}) { this.topic = topic this.maxReconnectTimes = options.maxReconnectTimes || this.defaultOptions.maxReconnectTimes! this.reconnectPeriod = options.reconnectPeriod || this.defaultOptions.reconnectPeriod! // 合并选项 this.defaultOptions = { ...this.defaultOptions, ...options } } /** * 初始化 MQTT 客户端 * @returns Promise */ async init(): Promise { if (this.client) { throw new Error('MQTT 客户端已初始化') } try { // const mqttUrl = 'ws://192.168.1.103:8083/mqtt' const mqttUrl = localStorage.getItem('MQTTZUTAI') == 'null' ? 'ws://192.168.1.24:8085/mqtt' : localStorage.getItem('MQTTZUTAI') console.log('🚀 ~ MQTT ~ init ~ mqttUrl:', mqttUrl) this.client = mqtt.connect(mqttUrl, this.defaultOptions as IClientOptions) this.setupEventListeners() // 等待连接成功或超时 return new Promise((resolve, reject) => { const timeout = setTimeout(() => { if (!this.isConnected) { reject(new Error('MQTT 连接超时')) } }, this.defaultOptions.connectTimeout) this.client?.on('connect', () => { clearTimeout(timeout) this.isConnected = true this.reconnectCount = 0 // 连接成功重置重连计数 resolve() }) this.client?.on('error', error => { clearTimeout(timeout) console.error('MQTT 连接错误:', error) reject(error) }) }) } catch (error) { console.error('初始化 MQTT 失败:', error) throw error } } /** * 设置事件监听器 */ private setupEventListeners(): void { if (!this.client) return this.client.on('close', () => { console.log('MQTT 连接已关闭') this.isConnected = false }) this.client.on('offline', () => { console.log('MQTT 客户端离线') this.isConnected = false }) this.client.on('reconnect', () => { console.log(`MQTT 正在尝试重连 (${this.reconnectCount + 1}/${this.maxReconnectTimes})...`) // 检查是否超过最大重连次数 if (this.reconnectCount >= this.maxReconnectTimes) { console.log('已达到最大重连次数,停止重连') this.client?.end(true) this.client = null return } this.reconnectCount++ }) } /** * 订阅主题 * @param subscribeOptions 可选的订阅选项 * @returns Promise */ async subscribe(subscribe: string, subscribeOptions: IClientSubscribeOptions = { qos: 1 }): Promise { if (!this.client || !this.isConnected) { throw new Error('MQTT 客户端未连接') } return new Promise((resolve, reject) => { this.client?.subscribe(subscribe, subscribeOptions, error => { if (error) { console.error('订阅失败:', error) reject(error) } else { console.log('订阅成功') resolve() } }) }) } /** * 取消订阅 * @returns Promise */ async unsubscribe(subscribe: string): Promise { if (!this.client || !this.isConnected) { throw new Error('MQTT 客户端未连接') } return new Promise((resolve, reject) => { this.client?.unsubscribe(subscribe, error => { if (error) { console.error('取消订阅失败:', error) reject(error) } else { console.log('取消订阅成功') resolve() } }) }) } /** * 设置消息回调 * @param callback 消息回调函数 */ onMessage(callback: OnMessageCallback): void { if (!this.client) { throw new Error('MQTT 客户端未初始化') } this.client.on('message', callback) } /** * 发布消息 * topic: string, 发送地址 * @param message 要发布的消息 * @param options 发布选项 * @returns Promise */ async publish( topic: string, message: any, options: { qos?: 0 | 1 | 2; retain?: boolean } = { qos: 1 } ): Promise { if (!this.client || !this.isConnected) { throw new Error('MQTT 客户端未连接') } return new Promise((resolve, reject) => { this.client?.publish(topic, message, options, error => { if (error) { console.error('消息发布失败:', error) reject(error) } else { console.log('消息发布成功') resolve() } }) }) } /** * 断开连接 * @param force 是否强制断开 */ disconnect(force: boolean = false): void { this.isManuallyDisconnected = true if (this.client) { this.client.end(force, () => { console.log('MQTT 连接已断开') this.isConnected = false this.client = null }) } } /** * 检查连接状态 * @returns boolean */ isConnectedToBroker(): boolean { return this.isConnected } /** * 获取当前重连次数 * @returns number */ getReconnectCount(): number { return this.reconnectCount } /** * 重置重连计数器 */ resetReconnectCount(): void { this.reconnectCount = 0 } } export default MQTT