Files
svgeditor2.0/src/utils/mqtt.ts

248 lines
7.2 KiB
TypeScript
Raw Normal View History

2025-09-25 11:34:55 +08:00
import type { MqttClient, OnMessageCallback, IClientOptions, IClientSubscribeOptions } from 'mqtt'
import mqtt from 'mqtt'
interface MQTTOptions {
protocolId?: string
qos?: 0 | 1 | 2
clean?: boolean
connectTimeout?: number
clientId?: string
username?: string
password?: string
reconnectPeriod?: number // 重连间隔(ms)
maxReconnectTimes?: number // 最大重连次数
}
class MQTT {
private topic: string
private client: MqttClient | null = null
private isConnected: boolean = false
private reconnectCount: number = 0
private maxReconnectTimes: number
private reconnectPeriod: number
private isManuallyDisconnected: boolean = false
private defaultOptions: MQTTOptions = {
protocolId: 'MQTT',
qos: 1,
clean: true,
connectTimeout: 30 * 1000,
clientId: `mqttjs_${Math.random().toString(16).substr(2, 8)}`,
username: 't_user',
password: 'njcnpqs',
reconnectPeriod: 1000, // 默认1秒重试一次
maxReconnectTimes: 3 // 默认最大重连5次
}
constructor(topic: string, options: MQTTOptions = {}) {
this.topic = topic
this.maxReconnectTimes = options.maxReconnectTimes || this.defaultOptions.maxReconnectTimes!
this.reconnectPeriod = options.reconnectPeriod || this.defaultOptions.reconnectPeriod!
// 合并选项
this.defaultOptions = { ...this.defaultOptions, ...options }
}
/**
* MQTT
* @returns Promise<void>
*/
async init(): Promise<void> {
if (this.client) {
throw new Error('MQTT 客户端已初始化')
}
try {
// const mqttUrl = 'ws://192.168.1.103:8083/mqtt'
const mqttUrl =
localStorage.getItem('MqttUrl') == 'null'
? 'ws://192.168.1.24:8085/mqtt'
: localStorage.getItem('MqttUrl')
console.log('🚀 ~ MQTT ~ init ~ mqttUrl:', mqttUrl)
this.client = mqtt.connect(mqttUrl, this.defaultOptions as IClientOptions)
this.setupEventListeners()
// 等待连接成功或超时
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
if (!this.isConnected) {
reject(new Error('MQTT 连接超时'))
}
}, this.defaultOptions.connectTimeout)
this.client?.on('connect', () => {
clearTimeout(timeout)
this.isConnected = true
this.reconnectCount = 0 // 连接成功重置重连计数
resolve()
})
this.client?.on('error', error => {
clearTimeout(timeout)
console.error('MQTT 连接错误:', error)
reject(error)
})
})
} catch (error) {
console.error('初始化 MQTT 失败:', error)
throw error
}
}
/**
*
*/
private setupEventListeners(): void {
if (!this.client) return
this.client.on('close', () => {
console.log('MQTT 连接已关闭')
this.isConnected = false
})
this.client.on('offline', () => {
console.log('MQTT 客户端离线')
this.isConnected = false
})
this.client.on('reconnect', () => {
console.log(`MQTT 正在尝试重连 (${this.reconnectCount + 1}/${this.maxReconnectTimes})...`)
// 检查是否超过最大重连次数
if (this.reconnectCount >= this.maxReconnectTimes) {
console.log('已达到最大重连次数,停止重连')
this.client?.end(true)
this.client = null
return
}
this.reconnectCount++
})
}
/**
*
* @param subscribeOptions
* @returns Promise<void>
*/
async subscribe(subscribe: string, subscribeOptions: IClientSubscribeOptions = { qos: 1 }): Promise<void> {
if (!this.client || !this.isConnected) {
throw new Error('MQTT 客户端未连接')
}
return new Promise((resolve, reject) => {
this.client?.subscribe(subscribe, subscribeOptions, error => {
if (error) {
console.error('订阅失败:', error)
reject(error)
} else {
console.log('订阅成功')
resolve()
}
})
})
}
/**
*
* @returns Promise<void>
*/
async unsubscribe(subscribe: string): Promise<void> {
if (!this.client || !this.isConnected) {
throw new Error('MQTT 客户端未连接')
}
return new Promise((resolve, reject) => {
this.client?.unsubscribe(subscribe, error => {
if (error) {
console.error('取消订阅失败:', error)
reject(error)
} else {
console.log('取消订阅成功')
resolve()
}
})
})
}
/**
*
* @param callback
*/
onMessage(callback: OnMessageCallback): void {
if (!this.client) {
throw new Error('MQTT 客户端未初始化')
}
this.client.on('message', callback)
}
/**
*
* topic: string,
* @param message
* @param options
* @returns Promise<void>
*/
async publish(
topic: string,
message: any,
options: { qos?: 0 | 1 | 2; retain?: boolean } = { qos: 1 }
): Promise<void> {
if (!this.client || !this.isConnected) {
throw new Error('MQTT 客户端未连接')
}
return new Promise((resolve, reject) => {
this.client?.publish(topic, message, options, error => {
if (error) {
console.error('消息发布失败:', error)
reject(error)
} else {
console.log('消息发布成功')
resolve()
}
})
})
}
/**
*
* @param force
*/
disconnect(force: boolean = false): void {
this.isManuallyDisconnected = true
if (this.client) {
this.client.end(force, () => {
console.log('MQTT 连接已断开')
this.isConnected = false
this.client = null
})
}
}
/**
*
* @returns boolean
*/
isConnectedToBroker(): boolean {
return this.isConnected
}
/**
*
* @returns number
*/
getReconnectCount(): number {
return this.reconnectCount
}
/**
*
*/
resetReconnectCount(): void {
this.reconnectCount = 0
}
}
export default MQTT