246 lines
7.2 KiB
TypeScript
246 lines
7.2 KiB
TypeScript
import type { MqttClient, OnMessageCallback, IClientOptions, IClientSubscribeOptions } from 'mqtt'
|
|
import mqtt from 'mqtt'
|
|
|
|
interface MQTTOptions {
|
|
protocolId?: string
|
|
qos?: 0 | 1 | 2
|
|
clean?: boolean
|
|
connectTimeout?: number
|
|
clientId?: string
|
|
username?: string
|
|
password?: string
|
|
reconnectPeriod?: number // 重连间隔(ms)
|
|
maxReconnectTimes?: number // 最大重连次数
|
|
}
|
|
|
|
class MQTT {
|
|
private topic: string
|
|
private client: MqttClient | null = null
|
|
private isConnected: boolean = false
|
|
private reconnectCount: number = 0
|
|
private maxReconnectTimes: number
|
|
private reconnectPeriod: number
|
|
private isManuallyDisconnected: boolean = false
|
|
private defaultOptions: MQTTOptions = {
|
|
protocolId: 'MQTT',
|
|
qos: 2,
|
|
clean: true,
|
|
connectTimeout: 30 * 1000,
|
|
clientId: `mqttjs_${Math.random().toString(16).substr(2, 8)}`,
|
|
username: '',
|
|
password: '',
|
|
reconnectPeriod: 1000, // 默认1秒重试一次
|
|
maxReconnectTimes: 3 // 默认最大重连5次
|
|
}
|
|
|
|
constructor(topic: string, options: MQTTOptions = {}) {
|
|
this.topic = topic
|
|
this.maxReconnectTimes = options.maxReconnectTimes || this.defaultOptions.maxReconnectTimes!
|
|
this.reconnectPeriod = options.reconnectPeriod || this.defaultOptions.reconnectPeriod!
|
|
|
|
// 合并选项
|
|
this.defaultOptions = { ...this.defaultOptions, ...options }
|
|
}
|
|
|
|
/**
|
|
* 初始化 MQTT 客户端
|
|
* @returns Promise<void>
|
|
*/
|
|
async init(): Promise<void> {
|
|
if (this.client) {
|
|
throw new Error('MQTT 客户端已初始化')
|
|
}
|
|
const mqttUrl = localStorage.getItem('MqttUrl')
|
|
console.log('MQTT URL:', mqttUrl)
|
|
if (!mqttUrl || mqttUrl === 'null') {
|
|
return Promise.resolve()
|
|
}
|
|
try {
|
|
// const mqttUrl =
|
|
// localStorage.getItem('MqttUrl') == 'null'
|
|
// ? 'ws://192.168.1.68:8083/mqtt'
|
|
// : localStorage.getItem('MqttUrl')
|
|
|
|
this.client = mqtt.connect(mqttUrl, this.defaultOptions as IClientOptions)
|
|
this.setupEventListeners()
|
|
|
|
// 等待连接成功或超时
|
|
return new Promise((resolve, reject) => {
|
|
const timeout = setTimeout(() => {
|
|
if (!this.isConnected) {
|
|
reject(new Error('MQTT 连接超时'))
|
|
}
|
|
}, this.defaultOptions.connectTimeout)
|
|
|
|
this.client?.on('connect', () => {
|
|
clearTimeout(timeout)
|
|
this.isConnected = true
|
|
this.reconnectCount = 0 // 连接成功重置重连计数
|
|
resolve()
|
|
})
|
|
|
|
this.client?.on('error', error => {
|
|
clearTimeout(timeout)
|
|
console.log('MQTT 连接错误:', error)
|
|
reject(error)
|
|
})
|
|
})
|
|
} catch (error) {
|
|
// console.log('初始化 MQTT 失败:', error)
|
|
throw error
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 设置事件监听器
|
|
*/
|
|
private setupEventListeners(): void {
|
|
if (!this.client) return
|
|
|
|
this.client.on('close', () => {
|
|
console.log('MQTT 连接已关闭')
|
|
this.isConnected = false
|
|
})
|
|
|
|
this.client.on('offline', () => {
|
|
console.log('MQTT 客户端离线')
|
|
this.isConnected = false
|
|
})
|
|
|
|
this.client.on('reconnect', () => {
|
|
console.log(`MQTT 正在尝试重连 (${this.reconnectCount + 1}/${this.maxReconnectTimes})...`)
|
|
|
|
// 检查是否超过最大重连次数
|
|
if (this.reconnectCount >= this.maxReconnectTimes) {
|
|
console.log('已达到最大重连次数,停止重连')
|
|
this.client?.end(true)
|
|
this.client = null
|
|
return
|
|
}
|
|
|
|
this.reconnectCount++
|
|
})
|
|
}
|
|
|
|
/**
|
|
* 订阅主题
|
|
* @param subscribeOptions 可选的订阅选项
|
|
* @returns Promise<void>
|
|
*/
|
|
async subscribe(subscribeOptions: IClientSubscribeOptions = {}): Promise<void> {
|
|
if (!this.client || !this.isConnected) {
|
|
throw new Error('MQTT 客户端未连接')
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.client?.subscribe(this.topic, { qos: this.defaultOptions.qos, ...subscribeOptions }, error => {
|
|
if (error) {
|
|
console.log('订阅失败:', error)
|
|
reject(error)
|
|
} else {
|
|
console.log('订阅成功')
|
|
resolve()
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* 取消订阅
|
|
* @returns Promise<void>
|
|
*/
|
|
async unsubscribe(): Promise<void> {
|
|
if (!this.client || !this.isConnected) {
|
|
throw new Error('MQTT 客户端未连接')
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.client?.unsubscribe(this.topic, error => {
|
|
if (error) {
|
|
console.log('取消订阅失败:', error)
|
|
reject(error)
|
|
} else {
|
|
console.log('取消订阅成功')
|
|
resolve()
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* 设置消息回调
|
|
* @param callback 消息回调函数
|
|
*/
|
|
onMessage(callback: OnMessageCallback): void {
|
|
if (!this.client) {
|
|
throw new Error('MQTT 客户端未初始化')
|
|
}
|
|
this.client.on('message', callback)
|
|
}
|
|
|
|
/**
|
|
* 发布消息
|
|
* @param message 要发布的消息
|
|
* @param options 发布选项
|
|
* @returns Promise<void>
|
|
*/
|
|
async publish(message: string | Buffer, options: { qos?: 0 | 1 | 2; retain?: boolean } = {}): Promise<void> {
|
|
if (!this.client || !this.isConnected) {
|
|
throw new Error('MQTT 客户端未连接')
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.client?.publish(this.topic, message, { qos: this.defaultOptions.qos, ...options }, error => {
|
|
if (error) {
|
|
console.log('消息发布失败:', error)
|
|
reject(error)
|
|
} else {
|
|
console.log('消息发布成功')
|
|
resolve()
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* 断开连接
|
|
* @param force 是否强制断开
|
|
*/
|
|
disconnect(force: boolean = false): void {
|
|
this.isManuallyDisconnected = true
|
|
|
|
if (this.client) {
|
|
this.client.end(force, () => {
|
|
console.log('MQTT 连接已断开')
|
|
this.isConnected = false
|
|
this.client = null
|
|
})
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 检查连接状态
|
|
* @returns boolean
|
|
*/
|
|
isConnectedToBroker(): boolean {
|
|
return this.isConnected
|
|
}
|
|
|
|
/**
|
|
* 获取当前重连次数
|
|
* @returns number
|
|
*/
|
|
getReconnectCount(): number {
|
|
return this.reconnectCount
|
|
}
|
|
|
|
/**
|
|
* 重置重连计数器
|
|
*/
|
|
resetReconnectCount(): void {
|
|
this.reconnectCount = 0
|
|
}
|
|
}
|
|
|
|
export default MQTT
|