提交代码
This commit is contained in:
247
src/utils/mqtt.ts
Normal file
247
src/utils/mqtt.ts
Normal file
@@ -0,0 +1,247 @@
|
||||
import type { MqttClient, OnMessageCallback, IClientOptions, IClientSubscribeOptions } from 'mqtt'
|
||||
import mqtt from 'mqtt'
|
||||
|
||||
interface MQTTOptions {
|
||||
protocolId?: string
|
||||
qos?: 0 | 1 | 2
|
||||
clean?: boolean
|
||||
connectTimeout?: number
|
||||
clientId?: string
|
||||
username?: string
|
||||
password?: string
|
||||
reconnectPeriod?: number // 重连间隔(ms)
|
||||
maxReconnectTimes?: number // 最大重连次数
|
||||
}
|
||||
|
||||
class MQTT {
|
||||
private topic: string
|
||||
private client: MqttClient | null = null
|
||||
private isConnected: boolean = false
|
||||
private reconnectCount: number = 0
|
||||
private maxReconnectTimes: number
|
||||
private reconnectPeriod: number
|
||||
private isManuallyDisconnected: boolean = false
|
||||
private defaultOptions: MQTTOptions = {
|
||||
protocolId: 'MQTT',
|
||||
qos: 1,
|
||||
clean: true,
|
||||
connectTimeout: 30 * 1000,
|
||||
clientId: `mqttjs_${Math.random().toString(16).substr(2, 8)}`,
|
||||
username: 't_user',
|
||||
password: 'njcnpqs',
|
||||
reconnectPeriod: 1000, // 默认1秒重试一次
|
||||
maxReconnectTimes: 3 // 默认最大重连5次
|
||||
}
|
||||
|
||||
constructor(topic: string, options: MQTTOptions = {}) {
|
||||
this.topic = topic
|
||||
this.maxReconnectTimes = options.maxReconnectTimes || this.defaultOptions.maxReconnectTimes!
|
||||
this.reconnectPeriod = options.reconnectPeriod || this.defaultOptions.reconnectPeriod!
|
||||
|
||||
// 合并选项
|
||||
this.defaultOptions = { ...this.defaultOptions, ...options }
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化 MQTT 客户端
|
||||
* @returns Promise<void>
|
||||
*/
|
||||
async init(): Promise<void> {
|
||||
if (this.client) {
|
||||
throw new Error('MQTT 客户端已初始化')
|
||||
}
|
||||
|
||||
try {
|
||||
// const mqttUrl = 'ws://192.168.1.103:8083/mqtt'
|
||||
const mqttUrl =
|
||||
localStorage.getItem('MqttUrl') == 'null'
|
||||
? 'ws://192.168.1.24:8085/mqtt'
|
||||
: localStorage.getItem('MqttUrl')
|
||||
console.log('🚀 ~ MQTT ~ init ~ mqttUrl:', mqttUrl)
|
||||
this.client = mqtt.connect(mqttUrl, this.defaultOptions as IClientOptions)
|
||||
this.setupEventListeners()
|
||||
|
||||
// 等待连接成功或超时
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
if (!this.isConnected) {
|
||||
reject(new Error('MQTT 连接超时'))
|
||||
}
|
||||
}, this.defaultOptions.connectTimeout)
|
||||
|
||||
this.client?.on('connect', () => {
|
||||
clearTimeout(timeout)
|
||||
this.isConnected = true
|
||||
this.reconnectCount = 0 // 连接成功重置重连计数
|
||||
resolve()
|
||||
})
|
||||
|
||||
this.client?.on('error', error => {
|
||||
clearTimeout(timeout)
|
||||
console.error('MQTT 连接错误:', error)
|
||||
reject(error)
|
||||
})
|
||||
})
|
||||
} catch (error) {
|
||||
console.error('初始化 MQTT 失败:', error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置事件监听器
|
||||
*/
|
||||
private setupEventListeners(): void {
|
||||
if (!this.client) return
|
||||
|
||||
this.client.on('close', () => {
|
||||
console.log('MQTT 连接已关闭')
|
||||
this.isConnected = false
|
||||
})
|
||||
|
||||
this.client.on('offline', () => {
|
||||
console.log('MQTT 客户端离线')
|
||||
this.isConnected = false
|
||||
})
|
||||
|
||||
this.client.on('reconnect', () => {
|
||||
console.log(`MQTT 正在尝试重连 (${this.reconnectCount + 1}/${this.maxReconnectTimes})...`)
|
||||
|
||||
// 检查是否超过最大重连次数
|
||||
if (this.reconnectCount >= this.maxReconnectTimes) {
|
||||
console.log('已达到最大重连次数,停止重连')
|
||||
this.client?.end(true)
|
||||
this.client = null
|
||||
return
|
||||
}
|
||||
|
||||
this.reconnectCount++
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅主题
|
||||
* @param subscribeOptions 可选的订阅选项
|
||||
* @returns Promise<void>
|
||||
*/
|
||||
async subscribe(subscribe: string, subscribeOptions: IClientSubscribeOptions = { qos: 1 }): Promise<void> {
|
||||
if (!this.client || !this.isConnected) {
|
||||
throw new Error('MQTT 客户端未连接')
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.client?.subscribe(subscribe, subscribeOptions, error => {
|
||||
if (error) {
|
||||
console.error('订阅失败:', error)
|
||||
reject(error)
|
||||
} else {
|
||||
console.log('订阅成功')
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消订阅
|
||||
* @returns Promise<void>
|
||||
*/
|
||||
async unsubscribe(subscribe: string): Promise<void> {
|
||||
if (!this.client || !this.isConnected) {
|
||||
throw new Error('MQTT 客户端未连接')
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.client?.unsubscribe(subscribe, error => {
|
||||
if (error) {
|
||||
console.error('取消订阅失败:', error)
|
||||
reject(error)
|
||||
} else {
|
||||
console.log('取消订阅成功')
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置消息回调
|
||||
* @param callback 消息回调函数
|
||||
*/
|
||||
onMessage(callback: OnMessageCallback): void {
|
||||
if (!this.client) {
|
||||
throw new Error('MQTT 客户端未初始化')
|
||||
}
|
||||
this.client.on('message', callback)
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布消息
|
||||
* topic: string, 发送地址
|
||||
* @param message 要发布的消息
|
||||
* @param options 发布选项
|
||||
* @returns Promise<void>
|
||||
*/
|
||||
async publish(
|
||||
topic: string,
|
||||
message: any,
|
||||
options: { qos?: 0 | 1 | 2; retain?: boolean } = { qos: 1 }
|
||||
): Promise<void> {
|
||||
if (!this.client || !this.isConnected) {
|
||||
throw new Error('MQTT 客户端未连接')
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.client?.publish(topic, message, options, error => {
|
||||
if (error) {
|
||||
console.error('消息发布失败:', error)
|
||||
reject(error)
|
||||
} else {
|
||||
console.log('消息发布成功')
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* 断开连接
|
||||
* @param force 是否强制断开
|
||||
*/
|
||||
disconnect(force: boolean = false): void {
|
||||
this.isManuallyDisconnected = true
|
||||
|
||||
if (this.client) {
|
||||
this.client.end(force, () => {
|
||||
console.log('MQTT 连接已断开')
|
||||
this.isConnected = false
|
||||
this.client = null
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查连接状态
|
||||
* @returns boolean
|
||||
*/
|
||||
isConnectedToBroker(): boolean {
|
||||
return this.isConnected
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前重连次数
|
||||
* @returns number
|
||||
*/
|
||||
getReconnectCount(): number {
|
||||
return this.reconnectCount
|
||||
}
|
||||
|
||||
/**
|
||||
* 重置重连计数器
|
||||
*/
|
||||
resetReconnectCount(): void {
|
||||
this.reconnectCount = 0
|
||||
}
|
||||
}
|
||||
|
||||
export default MQTT
|
||||
Reference in New Issue
Block a user