2026-03-11 08:47:54 +08:00
|
|
|
|
/**
|
|
|
|
|
|
* MQTT 客户端插件 for uni-app
|
|
|
|
|
|
* 基于 WebSocket 实现 MQTT 协议
|
|
|
|
|
|
*/
|
|
|
|
|
|
class MQTTClient {
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 构造函数
|
|
|
|
|
|
* @param {Object} options 配置选项
|
|
|
|
|
|
*/
|
|
|
|
|
|
constructor(options = {}) {
|
|
|
|
|
|
|
|
|
|
|
|
this.reSetconstructor(options);
|
|
|
|
|
|
// 事件监听器
|
|
|
|
|
|
this.eventListeners = {
|
|
|
|
|
|
connect: [],
|
|
|
|
|
|
disconnect: [],
|
|
|
|
|
|
error: [],
|
|
|
|
|
|
message: [],
|
|
|
|
|
|
statusChange: []
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 绑定方法
|
|
|
|
|
|
this.on = this.on.bind(this);
|
|
|
|
|
|
this.off = this.off.bind(this);
|
|
|
|
|
|
this.connect = this.connect.bind(this);
|
|
|
|
|
|
this.disconnect = this.disconnect.bind(this);
|
|
|
|
|
|
this.publish = this.publish.bind(this);
|
|
|
|
|
|
this.subscribe = this.subscribe.bind(this);
|
|
|
|
|
|
this.unsubscribe = this.unsubscribe.bind(this);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
reSetconstructor(options = {}) {
|
|
|
|
|
|
// 默认配置
|
|
|
|
|
|
this.config = {
|
|
|
|
|
|
path: '/ws',
|
|
|
|
|
|
protocolVersion: 4, // 4 for MQTT 3.1.1
|
|
|
|
|
|
clean: false,
|
|
|
|
|
|
cleanStart: false, // for MQTT 5.0
|
|
|
|
|
|
keepAlive: 60,
|
|
|
|
|
|
sessionExpiryInterval: 4294967295,
|
|
|
|
|
|
connectTimeout: 10000,
|
|
|
|
|
|
reconnectPeriod: 10000
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// 合并用户配置
|
|
|
|
|
|
Object.assign(this.config, options);
|
|
|
|
|
|
|
|
|
|
|
|
// 状态
|
|
|
|
|
|
this.socket = null;
|
|
|
|
|
|
this.connectionStatus = 'disconnected'; // 'disconnected', 'connecting', 'connected', 'error'
|
|
|
|
|
|
this.subscribedTopics = {};
|
|
|
|
|
|
this.messageId = 1;
|
|
|
|
|
|
this.connectTimeoutTimer = null;
|
|
|
|
|
|
this.reconnectTimer = null;
|
|
|
|
|
|
this.pingIntervalTimer = null;
|
|
|
|
|
|
this.pingTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 添加事件监听器
|
|
|
|
|
|
* @param {string} event 事件名称: 'connect', 'disconnect', 'error', 'message', 'statusChange'
|
|
|
|
|
|
* @param {Function} callback 回调函数
|
|
|
|
|
|
*/
|
|
|
|
|
|
on(event, callback) {
|
|
|
|
|
|
if (this.eventListeners[event]) {
|
|
|
|
|
|
this.eventListeners[event].push(callback);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 移除事件监听器
|
|
|
|
|
|
* @param {string} event 事件名称
|
|
|
|
|
|
* @param {Function} callback 回调函数
|
|
|
|
|
|
*/
|
|
|
|
|
|
off(event, callback) {
|
|
|
|
|
|
if (this.eventListeners[event]) {
|
|
|
|
|
|
const index = this.eventListeners[event].indexOf(callback);
|
|
|
|
|
|
if (index > -1) {
|
|
|
|
|
|
this.eventListeners[event].splice(index, 1);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 触发事件
|
|
|
|
|
|
* @param {string} event 事件名称
|
|
|
|
|
|
* @param {...any} args 参数
|
|
|
|
|
|
*/
|
|
|
|
|
|
emit(event, ...args) {
|
|
|
|
|
|
if (this.eventListeners[event]) {
|
|
|
|
|
|
this.eventListeners[event].forEach(callback => {
|
|
|
|
|
|
try {
|
|
|
|
|
|
callback(...args);
|
|
|
|
|
|
} catch (err) {
|
|
|
|
|
|
console.error(`Error in ${event} event listener:`, err);
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 更新连接状态
|
|
|
|
|
|
* @param {string} status 新状态
|
|
|
|
|
|
*/
|
|
|
|
|
|
setConnectionStatus(status) {
|
|
|
|
|
|
if (this.connectionStatus !== status) {
|
|
|
|
|
|
this.connectionStatus = status;
|
|
|
|
|
|
this.emit('statusChange', status);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 连接 MQTT 服务器
|
|
|
|
|
|
*/
|
|
|
|
|
|
connect() {
|
|
|
|
|
|
const {
|
|
|
|
|
|
host,
|
|
|
|
|
|
port,
|
|
|
|
|
|
path,
|
|
|
|
|
|
clientId,
|
|
|
|
|
|
username,
|
|
|
|
|
|
password,
|
|
|
|
|
|
protocolVersion,
|
|
|
|
|
|
clean,
|
|
|
|
|
|
cleanStart,
|
|
|
|
|
|
keepAlive,
|
|
|
|
|
|
sessionExpiryInterval,
|
|
|
|
|
|
connectTimeout
|
|
|
|
|
|
} = this.config;
|
|
|
|
|
|
|
|
|
|
|
|
console.log("连接参数:", this.config)
|
|
|
|
|
|
if (!host) {
|
|
|
|
|
|
this.emit('error', new Error('请输入主机地址'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
this.setConnectionStatus('connecting');
|
|
|
|
|
|
this.emit('statusChange', 'connecting');
|
|
|
|
|
|
|
|
|
|
|
|
// 构建 WebSocket URL
|
|
|
|
|
|
let urlPath = path;
|
|
|
|
|
|
if (!urlPath.startsWith('/')) {
|
|
|
|
|
|
urlPath = '/' + urlPath;
|
|
|
|
|
|
}
|
|
|
|
|
|
let url = `ws://${host}:${port}${urlPath}`;
|
|
|
|
|
|
|
|
|
|
|
|
// 关闭现有连接
|
|
|
|
|
|
if (this.socket) {
|
|
|
|
|
|
this.socket.close();
|
|
|
|
|
|
this.socket = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const ver = protocolVersion === 5 ? '5.0' : protocolVersion === 4 ? '3.1.1' : '3.1';
|
|
|
|
|
|
// 建立 WebSocket 连接
|
|
|
|
|
|
this.socket = uni.connectSocket({
|
|
|
|
|
|
url,
|
|
|
|
|
|
protocols: ['mqtt', 'mqttv3.1.1'],
|
|
|
|
|
|
header: {
|
|
|
|
|
|
'content-type': 'application/octet-stream'
|
|
|
|
|
|
},
|
|
|
|
|
|
success: () => {
|
|
|
|
|
|
// 连接请求已发送
|
|
|
|
|
|
},
|
|
|
|
|
|
fail: (err) => {
|
|
|
|
|
|
this.setConnectionStatus('error');
|
|
|
|
|
|
this.emit('error', new Error(`WebSocket 连接失败: ${JSON.stringify(err)}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
// 设置连接超时
|
|
|
|
|
|
if (connectTimeout > 0) {
|
|
|
|
|
|
this.connectTimeoutTimer = setTimeout(() => {
|
|
|
|
|
|
if (this.connectionStatus === 'connecting') {
|
|
|
|
|
|
this.setConnectionStatus('error');
|
|
|
|
|
|
this.emit('error', new Error(`连接超时 (${connectTimeout}ms)`));
|
|
|
|
|
|
if (this.socket) {
|
|
|
|
|
|
this.socket.close();
|
|
|
|
|
|
this.socket = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}, connectTimeout);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// WebSocket 事件监听
|
|
|
|
|
|
this.socket.onOpen(() => {
|
|
|
|
|
|
console.log('[MQTT调试] WebSocket 连接已打开');
|
|
|
|
|
|
if (this.connectTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.connectTimeoutTimer);
|
|
|
|
|
|
this.connectTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
this.sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart,
|
|
|
|
|
|
keepAlive, sessionExpiryInterval);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
this.socket.onMessage((res) => {
|
|
|
|
|
|
this.handleMQTTMessage(res.data);
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
this.socket.onError((err) => {
|
|
|
|
|
|
if (this.connectTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.connectTimeoutTimer);
|
|
|
|
|
|
this.connectTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
this.stopPingInterval();
|
|
|
|
|
|
this.setConnectionStatus('error');
|
|
|
|
|
|
this.emit('error', new Error(`WebSocket 错误: ${JSON.stringify(err)}`));
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
this.socket.onClose((res) => {
|
|
|
|
|
|
console.log('[MQTT调试] WebSocket 关闭,原因:', res);
|
|
|
|
|
|
if (this.connectTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.connectTimeoutTimer);
|
|
|
|
|
|
this.connectTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
this.stopPingInterval();
|
|
|
|
|
|
|
|
|
|
|
|
if (this.connectionStatus === 'connected' && this.config.reconnectPeriod > 0) {
|
|
|
|
|
|
this.reconnectTimer = setTimeout(() => {
|
|
|
|
|
|
this.connect();
|
|
|
|
|
|
}, this.config.reconnectPeriod);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
this.setConnectionStatus('disconnected');
|
|
|
|
|
|
this.emit('disconnect', res);
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 断开连接
|
|
|
|
|
|
*/
|
|
|
|
|
|
disconnect() {
|
|
|
|
|
|
if (this.connectTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.connectTimeoutTimer);
|
|
|
|
|
|
this.connectTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
if (this.reconnectTimer) {
|
|
|
|
|
|
clearTimeout(this.reconnectTimer);
|
|
|
|
|
|
this.reconnectTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
// 清除心跳超时定时器
|
|
|
|
|
|
if (this.pingTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.pingTimeoutTimer);
|
|
|
|
|
|
this.pingTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
this.stopPingInterval();
|
|
|
|
|
|
|
|
|
|
|
|
if (this.socket) {
|
|
|
|
|
|
this.sendDisconnectPacket();
|
|
|
|
|
|
setTimeout(() => {
|
|
|
|
|
|
this.socket.close();
|
|
|
|
|
|
this.socket = null;
|
|
|
|
|
|
this.setConnectionStatus('disconnected');
|
|
|
|
|
|
this.subscribedTopics = {};
|
|
|
|
|
|
}, 100);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发布消息
|
|
|
|
|
|
* @param {string} topic 主题
|
|
|
|
|
|
* @param {string} message 消息内容
|
|
|
|
|
|
* @param {number} qos QoS 等级 (0, 1, 2)
|
|
|
|
|
|
*/
|
|
|
|
|
|
publish(topic, message, qos = 0) {
|
|
|
|
|
|
if (this.connectionStatus !== 'connected') {
|
|
|
|
|
|
this.emit('error', new Error('请先连接'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (!topic) {
|
|
|
|
|
|
this.emit('error', new Error('请输入主题'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
this.sendPublishPacket(topic, message, qos);
|
|
|
|
|
|
this.emit('message', {
|
|
|
|
|
|
type: 'send',
|
|
|
|
|
|
topic,
|
|
|
|
|
|
message
|
|
|
|
|
|
});
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`发布失败: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 订阅主题
|
|
|
|
|
|
* @param {string} topic 主题
|
|
|
|
|
|
* @param {number} qos QoS 等级 (0, 1, 2)
|
|
|
|
|
|
*/
|
|
|
|
|
|
subscribe(topic, qos = 0) {
|
|
|
|
|
|
if (this.connectionStatus !== 'connected') {
|
|
|
|
|
|
this.emit('error', new Error('请先连接'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (!topic) {
|
|
|
|
|
|
this.emit('error', new Error('请输入订阅主题'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
this.sendSubscribePacket(topic, qos);
|
|
|
|
|
|
this.subscribedTopics[topic] = qos;
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`订阅失败: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 取消订阅
|
|
|
|
|
|
* @param {string} topic 主题
|
|
|
|
|
|
*/
|
|
|
|
|
|
unsubscribe(topic) {
|
|
|
|
|
|
if (this.connectionStatus !== 'connected') {
|
|
|
|
|
|
this.emit('error', new Error('请先连接'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (!topic) {
|
|
|
|
|
|
this.emit('error', new Error('请输入取消订阅的主题'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
this.sendUnsubscribePacket(topic);
|
|
|
|
|
|
delete this.subscribedTopics[topic];
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`取消订阅失败: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取当前连接状态
|
|
|
|
|
|
* @returns {string} 连接状态
|
|
|
|
|
|
*/
|
|
|
|
|
|
getStatus() {
|
|
|
|
|
|
return this.connectionStatus;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 获取已订阅的主题列表
|
|
|
|
|
|
* @returns {Array} 主题列表
|
|
|
|
|
|
*/
|
|
|
|
|
|
getSubscribedTopics() {
|
|
|
|
|
|
return Object.keys(this.subscribedTopics);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ========== 内部方法 ==========
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送数据包
|
|
|
|
|
|
* @param {string|ArrayBuffer|Array} packet 数据包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendPacket(packet) {
|
|
|
|
|
|
if (!this.socket) {
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
console.log("sendPacket", packet)
|
|
|
|
|
|
let arrayBuffer;
|
|
|
|
|
|
if (typeof packet === 'string') {
|
|
|
|
|
|
const bytes = this.ab2buffer(packet);
|
|
|
|
|
|
arrayBuffer = new Uint8Array(bytes).buffer;
|
|
|
|
|
|
// 调试日志:记录发送的数据包
|
|
|
|
|
|
console.log('MQTT 发送数据包(字符串):', packet.length, '字符,字节:', bytes.length);
|
|
|
|
|
|
console.log('数据包字节(前64字节):', bytes.slice(0, 64).map(b => b.toString(16).padStart(2, '0')).join(' '));
|
|
|
|
|
|
} else if (packet instanceof ArrayBuffer) {
|
|
|
|
|
|
arrayBuffer = packet;
|
|
|
|
|
|
console.log('MQTT 发送数据包(ArrayBuffer):', arrayBuffer.byteLength, '字节');
|
|
|
|
|
|
} else if (Array.isArray(packet)) {
|
|
|
|
|
|
arrayBuffer = new Uint8Array(packet).buffer;
|
|
|
|
|
|
console.log('MQTT 发送数据包(数组):', packet.length, '字节');
|
|
|
|
|
|
} else {
|
|
|
|
|
|
this.emit('error', new Error('未知的数据包格式'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
uni.sendSocketMessage({
|
|
|
|
|
|
data: arrayBuffer,
|
|
|
|
|
|
success: () => {
|
|
|
|
|
|
// 发送成功
|
|
|
|
|
|
console.log('MQTT 数据包发送成功');
|
|
|
|
|
|
},
|
|
|
|
|
|
fail: (err) => {
|
|
|
|
|
|
console.error('MQTT 数据包发送失败:', err);
|
|
|
|
|
|
this.emit('error', new Error(`数据发送失败: ${JSON.stringify(err)}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 MQTT 消息
|
|
|
|
|
|
* @param {*} data 原始数据
|
|
|
|
|
|
*/
|
|
|
|
|
|
handleMQTTMessage(data) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
console.log('[MQTT调试] 收到原始消息:', typeof data, data instanceof ArrayBuffer, data.byteLength || data
|
|
|
|
|
|
.length);
|
|
|
|
|
|
|
|
|
|
|
|
const buffer = this.ab2buffer(data);
|
|
|
|
|
|
console.log('[MQTT调试] 转换后的 buffer 长度:', buffer.length);
|
|
|
|
|
|
if (buffer.length > 0) {
|
|
|
|
|
|
const hexStr = buffer.slice(0, Math.min(32, buffer.length)).map(b => b.toString(16).padStart(2,
|
|
|
|
|
|
'0')).join(' ');
|
|
|
|
|
|
console.log('[MQTT调试] buffer 前32字节(hex):', hexStr);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (buffer.length === 0) {
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const packetType = buffer[0] & 0xF0;
|
|
|
|
|
|
const flags = buffer[0] & 0x0F;
|
|
|
|
|
|
|
|
|
|
|
|
console.log('[MQTT调试] 数据包类型:', '0x' + packetType.toString(16).padStart(2, '0'), '标志位:', flags);
|
|
|
|
|
|
|
|
|
|
|
|
let pos = 1;
|
|
|
|
|
|
let remainingLength = 0;
|
|
|
|
|
|
let multiplier = 1;
|
|
|
|
|
|
let encodedByte;
|
|
|
|
|
|
|
|
|
|
|
|
do {
|
|
|
|
|
|
if (pos >= buffer.length) {
|
|
|
|
|
|
console.log('[MQTT调试] 剩余长度编码不完整');
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
encodedByte = buffer[pos];
|
|
|
|
|
|
remainingLength += (encodedByte & 0x7F) * multiplier;
|
|
|
|
|
|
multiplier *= 128;
|
|
|
|
|
|
pos++;
|
|
|
|
|
|
} while ((encodedByte & 0x80) !== 0 && pos < buffer.length);
|
|
|
|
|
|
|
|
|
|
|
|
console.log('[MQTT调试] 剩余长度:', remainingLength, '当前位置:', pos);
|
|
|
|
|
|
|
|
|
|
|
|
if (packetType === 0x20) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 CONNACK 包');
|
|
|
|
|
|
this.handleConnackPacket(buffer, pos);
|
|
|
|
|
|
} else if (packetType === 0x30 || packetType === 0x32) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 PUBLISH 包');
|
|
|
|
|
|
this.handlePublishPacket(buffer, pos, flags);
|
|
|
|
|
|
} else if (packetType === 0x40) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 PUBACK 包');
|
|
|
|
|
|
this.handlePubackPacket(buffer, pos);
|
|
|
|
|
|
} else if (packetType === 0x90) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 SUBACK 包');
|
|
|
|
|
|
this.handleSubackPacket(buffer, pos);
|
|
|
|
|
|
} else if (packetType === 0xB0) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 UNSUBACK 包');
|
|
|
|
|
|
this.handleUnsubackPacket(buffer, pos);
|
|
|
|
|
|
} else if (packetType === 0xD0) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 PINGRESP 包');
|
|
|
|
|
|
// 清除心跳响应超时定时器
|
|
|
|
|
|
if (this.pingTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.pingTimeoutTimer);
|
|
|
|
|
|
this.pingTimeoutTimer = null;
|
|
|
|
|
|
console.log('[MQTT调试] 心跳响应超时定时器已清除');
|
|
|
|
|
|
}
|
|
|
|
|
|
} else if (packetType === 0xE0) {
|
|
|
|
|
|
console.log('[MQTT调试] 处理 DISCONNECT 包');
|
|
|
|
|
|
this.handleDisconnectPacket(buffer, pos);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
console.log('[MQTT调试] 未知数据包类型:', packetType.toString(16));
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
console.error('[MQTT调试] 处理 MQTT 消息时出错:', error);
|
|
|
|
|
|
this.emit('error', new Error(`处理 MQTT 消息时出错: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 CONNACK 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
handleConnackPacket(buffer, pos) {
|
|
|
|
|
|
if (buffer.length < pos + 2) {
|
|
|
|
|
|
this.emit('error', new Error('CONNACK 包长度不足'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const flags = buffer[pos++];
|
|
|
|
|
|
const reasonCode = buffer[pos++];
|
|
|
|
|
|
|
|
|
|
|
|
const reasonCodes = {
|
|
|
|
|
|
0: '成功',
|
|
|
|
|
|
128: '协议版本错误',
|
|
|
|
|
|
129: '客户端ID被拒绝',
|
|
|
|
|
|
130: '服务器不可用',
|
|
|
|
|
|
131: '用户名密码错误',
|
|
|
|
|
|
132: '未授权',
|
|
|
|
|
|
133: '服务器未找到',
|
|
|
|
|
|
134: '服务器不可用',
|
|
|
|
|
|
135: '用户名密码无效'
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
const reasonText = reasonCodes[reasonCode] || `未知错误 (${reasonCode})`;
|
|
|
|
|
|
|
|
|
|
|
|
if (reasonCode === 0) {
|
|
|
|
|
|
console.log('[MQTT调试] CONNACK 成功,设置连接状态为 connected');
|
|
|
|
|
|
this.setConnectionStatus('connected');
|
|
|
|
|
|
this.emit('connect');
|
|
|
|
|
|
this.startPingInterval();
|
|
|
|
|
|
|
|
|
|
|
|
// 重新订阅之前订阅的主题
|
|
|
|
|
|
if (Object.keys(this.subscribedTopics).length > 0) {
|
|
|
|
|
|
console.log('[MQTT调试] 重新订阅之前订阅的主题');
|
|
|
|
|
|
for (const topic in this.subscribedTopics) {
|
|
|
|
|
|
const qos = this.subscribedTopics[topic];
|
|
|
|
|
|
try {
|
|
|
|
|
|
this.sendSubscribePacket(topic, qos);
|
|
|
|
|
|
console.log(`[MQTT调试] 重新订阅主题: ${topic}, QoS: ${qos}`);
|
|
|
|
|
|
} catch (err) {
|
|
|
|
|
|
console.error(`[MQTT调试] 重新订阅主题失败: ${topic}`, err);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
this.setConnectionStatus('error');
|
|
|
|
|
|
this.emit('error', new Error(`MQTT 连接失败: ${reasonText}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 PUBLISH 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
handlePublishPacket(buffer, pos, flags) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
console.log('[MQTT调试] 开始处理 PUBLISH 包,buffer 长度:', buffer.length, '起始位置:', pos, '标志位:', flags);
|
|
|
|
|
|
|
|
|
|
|
|
const qos = (flags >> 1) & 0x03;
|
|
|
|
|
|
console.log('[MQTT调试] 解析出的 QoS:', qos);
|
|
|
|
|
|
|
|
|
|
|
|
if (buffer.length < pos + 2) {
|
|
|
|
|
|
console.error('[MQTT调试] PUBLISH 包主题长度字段不完整');
|
|
|
|
|
|
this.emit('error', new Error('PUBLISH 包主题长度字段不完整'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
const topicLen = buffer[pos++] * 256 + buffer[pos++];
|
|
|
|
|
|
console.log('[MQTT调试] 主题长度:', topicLen);
|
|
|
|
|
|
|
|
|
|
|
|
if (buffer.length < pos + topicLen) {
|
|
|
|
|
|
console.error('[MQTT调试] PUBLISH 包主题数据不完整');
|
|
|
|
|
|
this.emit('error', new Error('PUBLISH 包主题数据不完整'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
const topicBytes = buffer.slice(pos, pos + topicLen);
|
|
|
|
|
|
const topicHex = topicBytes.slice(0, Math.min(32, topicBytes.length)).map(b => b.toString(16).padStart(
|
|
|
|
|
|
2, '0')).join(' ');
|
|
|
|
|
|
console.log('[MQTT调试] 主题原始字节(hex):', topicHex);
|
|
|
|
|
|
const topic = this.bytesToString(topicBytes);
|
|
|
|
|
|
console.log('[MQTT调试] 解析出的主题:', topic);
|
|
|
|
|
|
pos += topicLen;
|
|
|
|
|
|
|
|
|
|
|
|
let packetId = 0;
|
|
|
|
|
|
if (qos > 0) {
|
|
|
|
|
|
if (buffer.length < pos + 2) {
|
|
|
|
|
|
console.error('[MQTT调试] PUBLISH 包包ID字段不完整');
|
|
|
|
|
|
this.emit('error', new Error('PUBLISH 包包ID字段不完整'));
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
packetId = buffer[pos++] * 256 + buffer[pos++];
|
|
|
|
|
|
console.log('[MQTT调试] 报文标识符:', packetId);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const payloadBytes = buffer.slice(pos);
|
|
|
|
|
|
console.log('[MQTT调试] 载荷字节长度:', payloadBytes.length);
|
|
|
|
|
|
if (payloadBytes.length > 0) {
|
|
|
|
|
|
const payloadHex = payloadBytes.slice(0, Math.min(64, payloadBytes.length)).map(b => b.toString(16)
|
|
|
|
|
|
.padStart(2, '0')).join(' ');
|
|
|
|
|
|
console.log('[MQTT调试] 载荷前64字节(hex):', payloadHex);
|
|
|
|
|
|
}
|
|
|
|
|
|
const payload = this.bytesToString(payloadBytes);
|
|
|
|
|
|
console.log('[MQTT调试] 解析出的载荷:', payload);
|
|
|
|
|
|
|
|
|
|
|
|
this.emit('message', {
|
|
|
|
|
|
type: 'receive',
|
|
|
|
|
|
topic,
|
|
|
|
|
|
payload
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
if (qos === 1) {
|
|
|
|
|
|
this.sendPubackPacket(packetId);
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
console.error('[MQTT调试] 处理 PUBLISH 包时出错:', error);
|
|
|
|
|
|
this.emit('error', new Error(`处理 PUBLISH 包时出错: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 PUBACK 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
handlePubackPacket(buffer, pos) {
|
|
|
|
|
|
if (buffer.length >= pos + 2) {
|
|
|
|
|
|
const packetId = buffer[pos++] * 256 + buffer[pos++];
|
|
|
|
|
|
// 可以触发事件
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 SUBACK 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
handleSubackPacket(buffer, pos) {
|
|
|
|
|
|
if (buffer.length >= pos + 2) {
|
|
|
|
|
|
const packetId = buffer[pos++] * 256 + buffer[pos++];
|
|
|
|
|
|
this.emit('subscribed', packetId);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 UNSUBACK 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
handleUnsubackPacket(buffer, pos) {
|
|
|
|
|
|
if (buffer.length >= pos + 2) {
|
|
|
|
|
|
const packetId = buffer[pos++] * 256 + buffer[pos++];
|
|
|
|
|
|
this.emit('unsubscribed', packetId);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 处理 DISCONNECT 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
handleDisconnectPacket(buffer, pos) {
|
|
|
|
|
|
this.setConnectionStatus('disconnected');
|
|
|
|
|
|
this.emit('disconnect', {
|
|
|
|
|
|
type: 'server'
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 CONNECT 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive,
|
|
|
|
|
|
sessionExpiryInterval) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
console.log('[MQTT调试] 发送 CONNECT 包,clean:', clean, 'cleanStart:', cleanStart, 'protocolVersion:',
|
|
|
|
|
|
protocolVersion);
|
|
|
|
|
|
let packet = '';
|
|
|
|
|
|
packet += String.fromCharCode(0x10);
|
|
|
|
|
|
|
|
|
|
|
|
let variableHeader = '';
|
|
|
|
|
|
variableHeader += String.fromCharCode(0x00, 0x04);
|
|
|
|
|
|
variableHeader += 'MQTT';
|
|
|
|
|
|
variableHeader += String.fromCharCode(protocolVersion);
|
|
|
|
|
|
|
|
|
|
|
|
let flags = 0x00;
|
|
|
|
|
|
if (protocolVersion === 5) {
|
|
|
|
|
|
if (cleanStart) flags |= 0x02;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
if (clean) flags |= 0x02;
|
|
|
|
|
|
}
|
|
|
|
|
|
if (username) flags |= 0x80;
|
|
|
|
|
|
if (password) flags |= 0x40;
|
|
|
|
|
|
variableHeader += String.fromCharCode(flags);
|
|
|
|
|
|
|
|
|
|
|
|
variableHeader += String.fromCharCode((keepAlive >> 8) & 0xFF, keepAlive & 0xFF);
|
|
|
|
|
|
|
|
|
|
|
|
// MQTT 5.0 属性处理
|
|
|
|
|
|
if (protocolVersion === 5) {
|
|
|
|
|
|
let properties = '';
|
|
|
|
|
|
let propertyLength = 0;
|
|
|
|
|
|
|
|
|
|
|
|
// 添加会话过期间隔属性 (0x11)
|
|
|
|
|
|
if (!cleanStart && sessionExpiryInterval !== undefined) {
|
|
|
|
|
|
properties += String.fromCharCode(0x11); // 属性标识符
|
|
|
|
|
|
properties += String.fromCharCode((sessionExpiryInterval >> 24) & 0xFF);
|
|
|
|
|
|
properties += String.fromCharCode((sessionExpiryInterval >> 16) & 0xFF);
|
|
|
|
|
|
properties += String.fromCharCode((sessionExpiryInterval >> 8) & 0xFF);
|
|
|
|
|
|
properties += String.fromCharCode(sessionExpiryInterval & 0xFF);
|
|
|
|
|
|
propertyLength += 1 + 4; // 标识符 + 4字节值
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 编码属性长度为可变字节整数
|
|
|
|
|
|
let encodedPropertyLength = '';
|
|
|
|
|
|
let propLen = propertyLength;
|
|
|
|
|
|
do {
|
|
|
|
|
|
let byte = propLen % 128;
|
|
|
|
|
|
propLen = Math.floor(propLen / 128);
|
|
|
|
|
|
if (propLen > 0) {
|
|
|
|
|
|
byte |= 0x80;
|
|
|
|
|
|
}
|
|
|
|
|
|
encodedPropertyLength += String.fromCharCode(byte);
|
|
|
|
|
|
} while (propLen > 0);
|
|
|
|
|
|
|
|
|
|
|
|
variableHeader += encodedPropertyLength;
|
|
|
|
|
|
variableHeader += properties;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let payload = this.encodeString(clientId);
|
|
|
|
|
|
if (username) payload += this.encodeString(username);
|
|
|
|
|
|
if (password) payload += this.encodeString(password);
|
|
|
|
|
|
|
|
|
|
|
|
const remainingLength = variableHeader.length + payload.length;
|
|
|
|
|
|
packet += this.encodeRemainingLength(remainingLength);
|
|
|
|
|
|
packet += variableHeader;
|
|
|
|
|
|
packet += payload;
|
|
|
|
|
|
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`发送 CONNECT 包失败: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 DISCONNECT 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendDisconnectPacket() {
|
|
|
|
|
|
let packet = String.fromCharCode(0xE0);
|
|
|
|
|
|
packet += String.fromCharCode(0x00);
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 PUBLISH 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendPublishPacket(topic, message, qos) {
|
|
|
|
|
|
let packet = '';
|
|
|
|
|
|
|
|
|
|
|
|
const flags = 0x30 | (qos << 1);
|
|
|
|
|
|
packet += String.fromCharCode(flags);
|
|
|
|
|
|
|
|
|
|
|
|
const topicBytes = this.encodeString(topic);
|
|
|
|
|
|
const msgBytes = this.stringToBytes(message);
|
|
|
|
|
|
|
|
|
|
|
|
let remainingLength = topicBytes.length + msgBytes.length;
|
|
|
|
|
|
|
|
|
|
|
|
if (qos > 0) {
|
|
|
|
|
|
remainingLength += 2;
|
|
|
|
|
|
packet += this.encodeRemainingLength(remainingLength);
|
|
|
|
|
|
packet += topicBytes;
|
|
|
|
|
|
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
|
|
|
|
|
|
this.messageId++;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
packet += this.encodeRemainingLength(remainingLength);
|
|
|
|
|
|
packet += topicBytes;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
packet += msgBytes;
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 SUBSCRIBE 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendSubscribePacket(topic, qos) {
|
|
|
|
|
|
let packet = String.fromCharCode(0x82);
|
|
|
|
|
|
|
|
|
|
|
|
const topicBytes = this.encodeString(topic);
|
|
|
|
|
|
let remainingLength = 2 + topicBytes.length + 1; // packetId + topicBytes + qos
|
|
|
|
|
|
|
|
|
|
|
|
if (this.config.protocolVersion === 5) {
|
|
|
|
|
|
remainingLength += 1; // property length byte (0x00)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
packet += this.encodeRemainingLength(remainingLength);
|
|
|
|
|
|
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
|
|
|
|
|
|
|
|
|
|
|
|
if (this.config.protocolVersion === 5) {
|
|
|
|
|
|
packet += String.fromCharCode(0x00); // property length = 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
packet += topicBytes;
|
|
|
|
|
|
packet += String.fromCharCode(qos);
|
|
|
|
|
|
|
|
|
|
|
|
this.messageId++;
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 UNSUBSCRIBE 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendUnsubscribePacket(topic) {
|
|
|
|
|
|
let packet = String.fromCharCode(0xA2);
|
|
|
|
|
|
|
|
|
|
|
|
const topicBytes = this.encodeString(topic);
|
|
|
|
|
|
let remainingLength = 2 + topicBytes.length; // packetId + topicBytes
|
|
|
|
|
|
|
|
|
|
|
|
if (this.config.protocolVersion === 5) {
|
|
|
|
|
|
remainingLength += 1; // property length byte (0x00)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
packet += this.encodeRemainingLength(remainingLength);
|
|
|
|
|
|
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
|
|
|
|
|
|
|
|
|
|
|
|
if (this.config.protocolVersion === 5) {
|
|
|
|
|
|
packet += String.fromCharCode(0x00); // property length = 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
packet += topicBytes;
|
|
|
|
|
|
|
|
|
|
|
|
this.messageId++;
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 PUBACK 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendPubackPacket(packetId) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
let packet = String.fromCharCode(0x40);
|
|
|
|
|
|
packet += String.fromCharCode(0x02);
|
|
|
|
|
|
packet += String.fromCharCode((packetId >> 8) & 0xFF, packetId & 0xFF);
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`发送 PUBACK 失败: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发送 PINGREQ 包
|
|
|
|
|
|
*/
|
|
|
|
|
|
sendPingPacket() {
|
|
|
|
|
|
try {
|
|
|
|
|
|
// 清除之前的超时定时器
|
|
|
|
|
|
if (this.pingTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.pingTimeoutTimer);
|
|
|
|
|
|
this.pingTimeoutTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
console.log('[MQTT调试] 发送 PINGREQ 包');
|
|
|
|
|
|
let packet = String.fromCharCode(0xC0);
|
|
|
|
|
|
packet += String.fromCharCode(0x00);
|
|
|
|
|
|
this.sendPacket(packet);
|
|
|
|
|
|
|
|
|
|
|
|
// 设置心跳响应超时定时器
|
|
|
|
|
|
const {
|
|
|
|
|
|
keepAlive
|
|
|
|
|
|
} = this.config;
|
|
|
|
|
|
if (keepAlive > 0) {
|
|
|
|
|
|
const timeout = keepAlive * 1000 * 1.5; // 1.5倍keepAlive时间
|
|
|
|
|
|
console.log('[MQTT调试] 设置心跳响应超时:', timeout, 'ms');
|
|
|
|
|
|
this.pingTimeoutTimer = setTimeout(() => {
|
|
|
|
|
|
console.log('[MQTT调试] 心跳响应超时,服务器未回复 PINGRESP');
|
|
|
|
|
|
this.emit('error', new Error('心跳响应超时,服务器未回复 PINGRESP'));
|
|
|
|
|
|
// 主动断开连接
|
|
|
|
|
|
this.disconnect();
|
|
|
|
|
|
}, timeout);
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`发送 PINGREQ 失败: ${error.message}`));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 启动心跳定时器
|
|
|
|
|
|
*/
|
|
|
|
|
|
startPingInterval() {
|
|
|
|
|
|
this.stopPingInterval();
|
|
|
|
|
|
const {
|
|
|
|
|
|
keepAlive
|
|
|
|
|
|
} = this.config;
|
|
|
|
|
|
console.log('[MQTT调试] 启动心跳定时器,keepAlive:', keepAlive);
|
|
|
|
|
|
if (keepAlive > 0) {
|
|
|
|
|
|
const interval = (keepAlive * 1000) * 0.8;
|
|
|
|
|
|
console.log('[MQTT调试] 心跳间隔:', interval, 'ms');
|
|
|
|
|
|
this.pingIntervalTimer = setInterval(() => {
|
|
|
|
|
|
if (this.connectionStatus === 'connected') {
|
|
|
|
|
|
console.log('[MQTT调试] 发送心跳 PINGREQ');
|
|
|
|
|
|
this.sendPingPacket();
|
|
|
|
|
|
}
|
|
|
|
|
|
}, interval);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 停止心跳定时器
|
|
|
|
|
|
*/
|
|
|
|
|
|
stopPingInterval() {
|
|
|
|
|
|
if (this.pingIntervalTimer) {
|
|
|
|
|
|
console.log('[MQTT调试] 停止心跳定时器');
|
|
|
|
|
|
clearInterval(this.pingIntervalTimer);
|
|
|
|
|
|
this.pingIntervalTimer = null;
|
|
|
|
|
|
}
|
|
|
|
|
|
// 清除心跳响应超时定时器
|
|
|
|
|
|
if (this.pingTimeoutTimer) {
|
|
|
|
|
|
clearTimeout(this.pingTimeoutTimer);
|
|
|
|
|
|
this.pingTimeoutTimer = null;
|
|
|
|
|
|
console.log('[MQTT调试] 心跳响应超时定时器已清除');
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// ========== 工具方法 ==========
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 将各种数据格式转换为字节数组
|
|
|
|
|
|
*/
|
|
|
|
|
|
ab2buffer(data) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
if (typeof data === 'string') {
|
|
|
|
|
|
const bytes = [];
|
|
|
|
|
|
for (let i = 0; i < data.length; i++) {
|
|
|
|
|
|
bytes.push(data.charCodeAt(i) & 0xFF);
|
|
|
|
|
|
}
|
|
|
|
|
|
return bytes;
|
|
|
|
|
|
} else if (data instanceof ArrayBuffer) {
|
|
|
|
|
|
return Array.from(new Uint8Array(data));
|
|
|
|
|
|
} else if (Array.isArray(data)) {
|
|
|
|
|
|
return data;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
return Array.from(data);
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
this.emit('error', new Error(`数据转换失败: ${error.message}`));
|
|
|
|
|
|
return [];
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 字节数组转字符串(UTF-8 解码)
|
|
|
|
|
|
*/
|
|
|
|
|
|
bytesToString(bytes) {
|
|
|
|
|
|
try {
|
|
|
|
|
|
if (!bytes || bytes.length === 0) return '';
|
|
|
|
|
|
|
|
|
|
|
|
// 优先使用 TextDecoder 进行 UTF-8 解码
|
|
|
|
|
|
if (typeof TextDecoder !== 'undefined') {
|
|
|
|
|
|
try {
|
|
|
|
|
|
const decoder = new TextDecoder('utf-8');
|
|
|
|
|
|
const array = new Uint8Array(bytes);
|
|
|
|
|
|
return decoder.decode(array);
|
|
|
|
|
|
} catch (decodeError) {
|
|
|
|
|
|
console.warn('TextDecoder 解码失败,回退到手动解码:', decodeError);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 手动 UTF-8 解码
|
|
|
|
|
|
let result = '';
|
|
|
|
|
|
let i = 0;
|
|
|
|
|
|
while (i < bytes.length) {
|
|
|
|
|
|
const byte = bytes[i];
|
|
|
|
|
|
|
|
|
|
|
|
// ASCII 字符 (0-127)
|
|
|
|
|
|
if (byte < 128) {
|
|
|
|
|
|
result += String.fromCharCode(byte);
|
|
|
|
|
|
i++;
|
|
|
|
|
|
}
|
|
|
|
|
|
// 2字节 UTF-8 字符
|
|
|
|
|
|
else if ((byte & 0xE0) === 0xC0 && i + 1 < bytes.length) {
|
|
|
|
|
|
const byte2 = bytes[i + 1];
|
|
|
|
|
|
if ((byte2 & 0xC0) === 0x80) {
|
|
|
|
|
|
const codePoint = ((byte & 0x1F) << 6) | (byte2 & 0x3F);
|
|
|
|
|
|
result += String.fromCharCode(codePoint);
|
|
|
|
|
|
i += 2;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 无效的 UTF-8 序列,跳过
|
|
|
|
|
|
i++;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
// 3字节 UTF-8 字符
|
|
|
|
|
|
else if ((byte & 0xF0) === 0xE0 && i + 2 < bytes.length) {
|
|
|
|
|
|
const byte2 = bytes[i + 1];
|
|
|
|
|
|
const byte3 = bytes[i + 2];
|
|
|
|
|
|
if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80) {
|
|
|
|
|
|
const codePoint = ((byte & 0x0F) << 12) | ((byte2 & 0x3F) << 6) | (byte3 & 0x3F);
|
|
|
|
|
|
result += String.fromCharCode(codePoint);
|
|
|
|
|
|
i += 3;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 无效的 UTF-8 序列,跳过
|
|
|
|
|
|
i++;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
// 4字节 UTF-8 字符(代理对)
|
|
|
|
|
|
else if ((byte & 0xF8) === 0xF0 && i + 3 < bytes.length) {
|
|
|
|
|
|
const byte2 = bytes[i + 1];
|
|
|
|
|
|
const byte3 = bytes[i + 2];
|
|
|
|
|
|
const byte4 = bytes[i + 3];
|
|
|
|
|
|
if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80 && (byte4 & 0xC0) === 0x80) {
|
|
|
|
|
|
const codePoint = ((byte & 0x07) << 18) | ((byte2 & 0x3F) << 12) | ((byte3 & 0x3F) << 6) | (
|
|
|
|
|
|
byte4 & 0x3F);
|
|
|
|
|
|
// 代理对处理
|
|
|
|
|
|
if (codePoint <= 0xFFFF) {
|
|
|
|
|
|
result += String.fromCharCode(codePoint);
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 转换为代理对
|
|
|
|
|
|
const highSurrogate = 0xD800 + (((codePoint - 0x10000) >> 10) & 0x3FF);
|
|
|
|
|
|
const lowSurrogate = 0xDC00 + ((codePoint - 0x10000) & 0x3FF);
|
|
|
|
|
|
result += String.fromCharCode(highSurrogate, lowSurrogate);
|
|
|
|
|
|
}
|
|
|
|
|
|
i += 4;
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 无效的 UTF-8 序列,跳过
|
|
|
|
|
|
i++;
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 无效的 UTF-8 序列,跳过
|
|
|
|
|
|
i++;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return result;
|
|
|
|
|
|
} catch (error) {
|
|
|
|
|
|
console.error('字节转字符串失败:', error);
|
|
|
|
|
|
this.emit('error', new Error(`字节转字符串失败: ${error.message}`));
|
|
|
|
|
|
return '';
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 编码字符串(添加长度前缀)
|
|
|
|
|
|
*/
|
|
|
|
|
|
encodeString(str) {
|
|
|
|
|
|
let result = '';
|
|
|
|
|
|
const bytes = this.stringToBytes(str);
|
|
|
|
|
|
result += String.fromCharCode((bytes.length >> 8) & 0xFF);
|
|
|
|
|
|
result += String.fromCharCode(bytes.length & 0xFF);
|
|
|
|
|
|
for (let i = 0; i < bytes.length; i++) {
|
|
|
|
|
|
result += String.fromCharCode(bytes[i]);
|
|
|
|
|
|
}
|
|
|
|
|
|
return result;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 字符串转字节数组(UTF-8 编码)
|
|
|
|
|
|
*/
|
|
|
|
|
|
stringToBytes(str) {
|
|
|
|
|
|
const bytes = [];
|
|
|
|
|
|
for (let i = 0; i < str.length; i++) {
|
|
|
|
|
|
let code = str.charCodeAt(i);
|
|
|
|
|
|
|
|
|
|
|
|
if (code <= 0x7F) {
|
|
|
|
|
|
// 1字节 UTF-8
|
|
|
|
|
|
bytes.push(code);
|
|
|
|
|
|
} else if (code <= 0x7FF) {
|
|
|
|
|
|
// 2字节 UTF-8
|
|
|
|
|
|
bytes.push(0xC0 | (code >> 6));
|
|
|
|
|
|
bytes.push(0x80 | (code & 0x3F));
|
|
|
|
|
|
} else if (code >= 0xD800 && code <= 0xDFFF) {
|
|
|
|
|
|
// 代理对 - 4字节 UTF-8
|
|
|
|
|
|
i++;
|
|
|
|
|
|
if (i < str.length) {
|
|
|
|
|
|
const high = code;
|
|
|
|
|
|
const low = str.charCodeAt(i);
|
|
|
|
|
|
const fullCode = ((high & 0x3FF) << 10) + (low & 0x3FF) + 0x10000;
|
|
|
|
|
|
bytes.push(0xF0 | (fullCode >> 18));
|
|
|
|
|
|
bytes.push(0x80 | ((fullCode >> 12) & 0x3F));
|
|
|
|
|
|
bytes.push(0x80 | ((fullCode >> 6) & 0x3F));
|
|
|
|
|
|
bytes.push(0x80 | (fullCode & 0x3F));
|
|
|
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// 3字节 UTF-8
|
|
|
|
|
|
bytes.push(0xE0 | (code >> 12));
|
|
|
|
|
|
bytes.push(0x80 | ((code >> 6) & 0x3F));
|
|
|
|
|
|
bytes.push(0x80 | (code & 0x3F));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return bytes;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 编码剩余长度
|
|
|
|
|
|
*/
|
|
|
|
|
|
encodeRemainingLength(length) {
|
|
|
|
|
|
let result = '';
|
|
|
|
|
|
do {
|
|
|
|
|
|
let byte = length % 128;
|
|
|
|
|
|
length = Math.floor(length / 128);
|
|
|
|
|
|
if (length > 0) {
|
|
|
|
|
|
byte |= 0x80;
|
|
|
|
|
|
}
|
|
|
|
|
|
result += String.fromCharCode(byte);
|
|
|
|
|
|
} while (length > 0);
|
|
|
|
|
|
return result;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 导出类
|
|
|
|
|
|
export default MQTTClient;
|