Files
smart_storage_app/static/lib/mqtt.js
2026-03-11 08:47:54 +08:00

1065 lines
28 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* MQTT 客户端插件 for uni-app
* 基于 WebSocket 实现 MQTT 协议
*/
class MQTTClient {
/**
* 构造函数
* @param {Object} options 配置选项
*/
constructor(options = {}) {
this.reSetconstructor(options);
// 事件监听器
this.eventListeners = {
connect: [],
disconnect: [],
error: [],
message: [],
statusChange: []
};
// 绑定方法
this.on = this.on.bind(this);
this.off = this.off.bind(this);
this.connect = this.connect.bind(this);
this.disconnect = this.disconnect.bind(this);
this.publish = this.publish.bind(this);
this.subscribe = this.subscribe.bind(this);
this.unsubscribe = this.unsubscribe.bind(this);
}
reSetconstructor(options = {}) {
// 默认配置
this.config = {
path: '/ws',
protocolVersion: 4, // 4 for MQTT 3.1.1
clean: false,
cleanStart: false, // for MQTT 5.0
keepAlive: 60,
sessionExpiryInterval: 4294967295,
connectTimeout: 10000,
reconnectPeriod: 10000
};
// 合并用户配置
Object.assign(this.config, options);
// 状态
this.socket = null;
this.connectionStatus = 'disconnected'; // 'disconnected', 'connecting', 'connected', 'error'
this.subscribedTopics = {};
this.messageId = 1;
this.connectTimeoutTimer = null;
this.reconnectTimer = null;
this.pingIntervalTimer = null;
this.pingTimeoutTimer = null;
}
/**
* 添加事件监听器
* @param {string} event 事件名称: 'connect', 'disconnect', 'error', 'message', 'statusChange'
* @param {Function} callback 回调函数
*/
on(event, callback) {
if (this.eventListeners[event]) {
this.eventListeners[event].push(callback);
}
}
/**
* 移除事件监听器
* @param {string} event 事件名称
* @param {Function} callback 回调函数
*/
off(event, callback) {
if (this.eventListeners[event]) {
const index = this.eventListeners[event].indexOf(callback);
if (index > -1) {
this.eventListeners[event].splice(index, 1);
}
}
}
/**
* 触发事件
* @param {string} event 事件名称
* @param {...any} args 参数
*/
emit(event, ...args) {
if (this.eventListeners[event]) {
this.eventListeners[event].forEach(callback => {
try {
callback(...args);
} catch (err) {
console.error(`Error in ${event} event listener:`, err);
}
});
}
}
/**
* 更新连接状态
* @param {string} status 新状态
*/
setConnectionStatus(status) {
if (this.connectionStatus !== status) {
this.connectionStatus = status;
this.emit('statusChange', status);
}
}
/**
* 连接 MQTT 服务器
*/
connect() {
const {
host,
port,
path,
clientId,
username,
password,
protocolVersion,
clean,
cleanStart,
keepAlive,
sessionExpiryInterval,
connectTimeout
} = this.config;
console.log("连接参数:", this.config)
if (!host) {
this.emit('error', new Error('请输入主机地址'));
return;
}
this.setConnectionStatus('connecting');
this.emit('statusChange', 'connecting');
// 构建 WebSocket URL
let urlPath = path;
if (!urlPath.startsWith('/')) {
urlPath = '/' + urlPath;
}
let url = `ws://${host}:${port}${urlPath}`;
// 关闭现有连接
if (this.socket) {
this.socket.close();
this.socket = null;
}
const ver = protocolVersion === 5 ? '5.0' : protocolVersion === 4 ? '3.1.1' : '3.1';
// 建立 WebSocket 连接
this.socket = uni.connectSocket({
url,
protocols: ['mqtt', 'mqttv3.1.1'],
header: {
'content-type': 'application/octet-stream'
},
success: () => {
// 连接请求已发送
},
fail: (err) => {
this.setConnectionStatus('error');
this.emit('error', new Error(`WebSocket 连接失败: ${JSON.stringify(err)}`));
}
});
// 设置连接超时
if (connectTimeout > 0) {
this.connectTimeoutTimer = setTimeout(() => {
if (this.connectionStatus === 'connecting') {
this.setConnectionStatus('error');
this.emit('error', new Error(`连接超时 (${connectTimeout}ms)`));
if (this.socket) {
this.socket.close();
this.socket = null;
}
}
}, connectTimeout);
}
// WebSocket 事件监听
this.socket.onOpen(() => {
console.log('[MQTT调试] WebSocket 连接已打开');
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
this.sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart,
keepAlive, sessionExpiryInterval);
});
this.socket.onMessage((res) => {
this.handleMQTTMessage(res.data);
});
this.socket.onError((err) => {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
this.stopPingInterval();
this.setConnectionStatus('error');
this.emit('error', new Error(`WebSocket 错误: ${JSON.stringify(err)}`));
});
this.socket.onClose((res) => {
console.log('[MQTT调试] WebSocket 关闭,原因:', res);
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
this.stopPingInterval();
if (this.connectionStatus === 'connected' && this.config.reconnectPeriod > 0) {
this.reconnectTimer = setTimeout(() => {
this.connect();
}, this.config.reconnectPeriod);
}
this.setConnectionStatus('disconnected');
this.emit('disconnect', res);
});
}
/**
* 断开连接
*/
disconnect() {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
// 清除心跳超时定时器
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
}
this.stopPingInterval();
if (this.socket) {
this.sendDisconnectPacket();
setTimeout(() => {
this.socket.close();
this.socket = null;
this.setConnectionStatus('disconnected');
this.subscribedTopics = {};
}, 100);
}
}
/**
* 发布消息
* @param {string} topic 主题
* @param {string} message 消息内容
* @param {number} qos QoS 等级 (0, 1, 2)
*/
publish(topic, message, qos = 0) {
if (this.connectionStatus !== 'connected') {
this.emit('error', new Error('请先连接'));
return;
}
if (!topic) {
this.emit('error', new Error('请输入主题'));
return;
}
try {
this.sendPublishPacket(topic, message, qos);
this.emit('message', {
type: 'send',
topic,
message
});
} catch (error) {
this.emit('error', new Error(`发布失败: ${error.message}`));
}
}
/**
* 订阅主题
* @param {string} topic 主题
* @param {number} qos QoS 等级 (0, 1, 2)
*/
subscribe(topic, qos = 0) {
if (this.connectionStatus !== 'connected') {
this.emit('error', new Error('请先连接'));
return;
}
if (!topic) {
this.emit('error', new Error('请输入订阅主题'));
return;
}
try {
this.sendSubscribePacket(topic, qos);
this.subscribedTopics[topic] = qos;
} catch (error) {
this.emit('error', new Error(`订阅失败: ${error.message}`));
}
}
/**
* 取消订阅
* @param {string} topic 主题
*/
unsubscribe(topic) {
if (this.connectionStatus !== 'connected') {
this.emit('error', new Error('请先连接'));
return;
}
if (!topic) {
this.emit('error', new Error('请输入取消订阅的主题'));
return;
}
try {
this.sendUnsubscribePacket(topic);
delete this.subscribedTopics[topic];
} catch (error) {
this.emit('error', new Error(`取消订阅失败: ${error.message}`));
}
}
/**
* 获取当前连接状态
* @returns {string} 连接状态
*/
getStatus() {
return this.connectionStatus;
}
/**
* 获取已订阅的主题列表
* @returns {Array} 主题列表
*/
getSubscribedTopics() {
return Object.keys(this.subscribedTopics);
}
// ========== 内部方法 ==========
/**
* 发送数据包
* @param {string|ArrayBuffer|Array} packet 数据包
*/
sendPacket(packet) {
if (!this.socket) {
return;
}
console.log("sendPacket", packet)
let arrayBuffer;
if (typeof packet === 'string') {
const bytes = this.ab2buffer(packet);
arrayBuffer = new Uint8Array(bytes).buffer;
// 调试日志:记录发送的数据包
console.log('MQTT 发送数据包(字符串):', packet.length, '字符,字节:', bytes.length);
console.log('数据包字节前64字节:', bytes.slice(0, 64).map(b => b.toString(16).padStart(2, '0')).join(' '));
} else if (packet instanceof ArrayBuffer) {
arrayBuffer = packet;
console.log('MQTT 发送数据包ArrayBuffer:', arrayBuffer.byteLength, '字节');
} else if (Array.isArray(packet)) {
arrayBuffer = new Uint8Array(packet).buffer;
console.log('MQTT 发送数据包(数组):', packet.length, '字节');
} else {
this.emit('error', new Error('未知的数据包格式'));
return;
}
uni.sendSocketMessage({
data: arrayBuffer,
success: () => {
// 发送成功
console.log('MQTT 数据包发送成功');
},
fail: (err) => {
console.error('MQTT 数据包发送失败:', err);
this.emit('error', new Error(`数据发送失败: ${JSON.stringify(err)}`));
}
});
}
/**
* 处理 MQTT 消息
* @param {*} data 原始数据
*/
handleMQTTMessage(data) {
try {
console.log('[MQTT调试] 收到原始消息:', typeof data, data instanceof ArrayBuffer, data.byteLength || data
.length);
const buffer = this.ab2buffer(data);
console.log('[MQTT调试] 转换后的 buffer 长度:', buffer.length);
if (buffer.length > 0) {
const hexStr = buffer.slice(0, Math.min(32, buffer.length)).map(b => b.toString(16).padStart(2,
'0')).join(' ');
console.log('[MQTT调试] buffer 前32字节hex:', hexStr);
}
if (buffer.length === 0) {
return;
}
const packetType = buffer[0] & 0xF0;
const flags = buffer[0] & 0x0F;
console.log('[MQTT调试] 数据包类型:', '0x' + packetType.toString(16).padStart(2, '0'), '标志位:', flags);
let pos = 1;
let remainingLength = 0;
let multiplier = 1;
let encodedByte;
do {
if (pos >= buffer.length) {
console.log('[MQTT调试] 剩余长度编码不完整');
return;
}
encodedByte = buffer[pos];
remainingLength += (encodedByte & 0x7F) * multiplier;
multiplier *= 128;
pos++;
} while ((encodedByte & 0x80) !== 0 && pos < buffer.length);
console.log('[MQTT调试] 剩余长度:', remainingLength, '当前位置:', pos);
if (packetType === 0x20) {
console.log('[MQTT调试] 处理 CONNACK 包');
this.handleConnackPacket(buffer, pos);
} else if (packetType === 0x30 || packetType === 0x32) {
console.log('[MQTT调试] 处理 PUBLISH 包');
this.handlePublishPacket(buffer, pos, flags);
} else if (packetType === 0x40) {
console.log('[MQTT调试] 处理 PUBACK 包');
this.handlePubackPacket(buffer, pos);
} else if (packetType === 0x90) {
console.log('[MQTT调试] 处理 SUBACK 包');
this.handleSubackPacket(buffer, pos);
} else if (packetType === 0xB0) {
console.log('[MQTT调试] 处理 UNSUBACK 包');
this.handleUnsubackPacket(buffer, pos);
} else if (packetType === 0xD0) {
console.log('[MQTT调试] 处理 PINGRESP 包');
// 清除心跳响应超时定时器
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
console.log('[MQTT调试] 心跳响应超时定时器已清除');
}
} else if (packetType === 0xE0) {
console.log('[MQTT调试] 处理 DISCONNECT 包');
this.handleDisconnectPacket(buffer, pos);
} else {
console.log('[MQTT调试] 未知数据包类型:', packetType.toString(16));
}
} catch (error) {
console.error('[MQTT调试] 处理 MQTT 消息时出错:', error);
this.emit('error', new Error(`处理 MQTT 消息时出错: ${error.message}`));
}
}
/**
* 处理 CONNACK 包
*/
handleConnackPacket(buffer, pos) {
if (buffer.length < pos + 2) {
this.emit('error', new Error('CONNACK 包长度不足'));
return;
}
const flags = buffer[pos++];
const reasonCode = buffer[pos++];
const reasonCodes = {
0: '成功',
128: '协议版本错误',
129: '客户端ID被拒绝',
130: '服务器不可用',
131: '用户名密码错误',
132: '未授权',
133: '服务器未找到',
134: '服务器不可用',
135: '用户名密码无效'
};
const reasonText = reasonCodes[reasonCode] || `未知错误 (${reasonCode})`;
if (reasonCode === 0) {
console.log('[MQTT调试] CONNACK 成功,设置连接状态为 connected');
this.setConnectionStatus('connected');
this.emit('connect');
this.startPingInterval();
// 重新订阅之前订阅的主题
if (Object.keys(this.subscribedTopics).length > 0) {
console.log('[MQTT调试] 重新订阅之前订阅的主题');
for (const topic in this.subscribedTopics) {
const qos = this.subscribedTopics[topic];
try {
this.sendSubscribePacket(topic, qos);
console.log(`[MQTT调试] 重新订阅主题: ${topic}, QoS: ${qos}`);
} catch (err) {
console.error(`[MQTT调试] 重新订阅主题失败: ${topic}`, err);
}
}
}
} else {
this.setConnectionStatus('error');
this.emit('error', new Error(`MQTT 连接失败: ${reasonText}`));
}
}
/**
* 处理 PUBLISH 包
*/
handlePublishPacket(buffer, pos, flags) {
try {
console.log('[MQTT调试] 开始处理 PUBLISH 包buffer 长度:', buffer.length, '起始位置:', pos, '标志位:', flags);
const qos = (flags >> 1) & 0x03;
console.log('[MQTT调试] 解析出的 QoS:', qos);
if (buffer.length < pos + 2) {
console.error('[MQTT调试] PUBLISH 包主题长度字段不完整');
this.emit('error', new Error('PUBLISH 包主题长度字段不完整'));
return;
}
const topicLen = buffer[pos++] * 256 + buffer[pos++];
console.log('[MQTT调试] 主题长度:', topicLen);
if (buffer.length < pos + topicLen) {
console.error('[MQTT调试] PUBLISH 包主题数据不完整');
this.emit('error', new Error('PUBLISH 包主题数据不完整'));
return;
}
const topicBytes = buffer.slice(pos, pos + topicLen);
const topicHex = topicBytes.slice(0, Math.min(32, topicBytes.length)).map(b => b.toString(16).padStart(
2, '0')).join(' ');
console.log('[MQTT调试] 主题原始字节hex:', topicHex);
const topic = this.bytesToString(topicBytes);
console.log('[MQTT调试] 解析出的主题:', topic);
pos += topicLen;
let packetId = 0;
if (qos > 0) {
if (buffer.length < pos + 2) {
console.error('[MQTT调试] PUBLISH 包包ID字段不完整');
this.emit('error', new Error('PUBLISH 包包ID字段不完整'));
return;
}
packetId = buffer[pos++] * 256 + buffer[pos++];
console.log('[MQTT调试] 报文标识符:', packetId);
}
const payloadBytes = buffer.slice(pos);
console.log('[MQTT调试] 载荷字节长度:', payloadBytes.length);
if (payloadBytes.length > 0) {
const payloadHex = payloadBytes.slice(0, Math.min(64, payloadBytes.length)).map(b => b.toString(16)
.padStart(2, '0')).join(' ');
console.log('[MQTT调试] 载荷前64字节hex:', payloadHex);
}
const payload = this.bytesToString(payloadBytes);
console.log('[MQTT调试] 解析出的载荷:', payload);
this.emit('message', {
type: 'receive',
topic,
payload
});
if (qos === 1) {
this.sendPubackPacket(packetId);
}
} catch (error) {
console.error('[MQTT调试] 处理 PUBLISH 包时出错:', error);
this.emit('error', new Error(`处理 PUBLISH 包时出错: ${error.message}`));
}
}
/**
* 处理 PUBACK 包
*/
handlePubackPacket(buffer, pos) {
if (buffer.length >= pos + 2) {
const packetId = buffer[pos++] * 256 + buffer[pos++];
// 可以触发事件
}
}
/**
* 处理 SUBACK 包
*/
handleSubackPacket(buffer, pos) {
if (buffer.length >= pos + 2) {
const packetId = buffer[pos++] * 256 + buffer[pos++];
this.emit('subscribed', packetId);
}
}
/**
* 处理 UNSUBACK 包
*/
handleUnsubackPacket(buffer, pos) {
if (buffer.length >= pos + 2) {
const packetId = buffer[pos++] * 256 + buffer[pos++];
this.emit('unsubscribed', packetId);
}
}
/**
* 处理 DISCONNECT 包
*/
handleDisconnectPacket(buffer, pos) {
this.setConnectionStatus('disconnected');
this.emit('disconnect', {
type: 'server'
});
}
/**
* 发送 CONNECT 包
*/
sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive,
sessionExpiryInterval) {
try {
console.log('[MQTT调试] 发送 CONNECT 包clean:', clean, 'cleanStart:', cleanStart, 'protocolVersion:',
protocolVersion);
let packet = '';
packet += String.fromCharCode(0x10);
let variableHeader = '';
variableHeader += String.fromCharCode(0x00, 0x04);
variableHeader += 'MQTT';
variableHeader += String.fromCharCode(protocolVersion);
let flags = 0x00;
if (protocolVersion === 5) {
if (cleanStart) flags |= 0x02;
} else {
if (clean) flags |= 0x02;
}
if (username) flags |= 0x80;
if (password) flags |= 0x40;
variableHeader += String.fromCharCode(flags);
variableHeader += String.fromCharCode((keepAlive >> 8) & 0xFF, keepAlive & 0xFF);
// MQTT 5.0 属性处理
if (protocolVersion === 5) {
let properties = '';
let propertyLength = 0;
// 添加会话过期间隔属性 (0x11)
if (!cleanStart && sessionExpiryInterval !== undefined) {
properties += String.fromCharCode(0x11); // 属性标识符
properties += String.fromCharCode((sessionExpiryInterval >> 24) & 0xFF);
properties += String.fromCharCode((sessionExpiryInterval >> 16) & 0xFF);
properties += String.fromCharCode((sessionExpiryInterval >> 8) & 0xFF);
properties += String.fromCharCode(sessionExpiryInterval & 0xFF);
propertyLength += 1 + 4; // 标识符 + 4字节值
}
// 编码属性长度为可变字节整数
let encodedPropertyLength = '';
let propLen = propertyLength;
do {
let byte = propLen % 128;
propLen = Math.floor(propLen / 128);
if (propLen > 0) {
byte |= 0x80;
}
encodedPropertyLength += String.fromCharCode(byte);
} while (propLen > 0);
variableHeader += encodedPropertyLength;
variableHeader += properties;
}
let payload = this.encodeString(clientId);
if (username) payload += this.encodeString(username);
if (password) payload += this.encodeString(password);
const remainingLength = variableHeader.length + payload.length;
packet += this.encodeRemainingLength(remainingLength);
packet += variableHeader;
packet += payload;
this.sendPacket(packet);
} catch (error) {
this.emit('error', new Error(`发送 CONNECT 包失败: ${error.message}`));
}
}
/**
* 发送 DISCONNECT 包
*/
sendDisconnectPacket() {
let packet = String.fromCharCode(0xE0);
packet += String.fromCharCode(0x00);
this.sendPacket(packet);
}
/**
* 发送 PUBLISH 包
*/
sendPublishPacket(topic, message, qos) {
let packet = '';
const flags = 0x30 | (qos << 1);
packet += String.fromCharCode(flags);
const topicBytes = this.encodeString(topic);
const msgBytes = this.stringToBytes(message);
let remainingLength = topicBytes.length + msgBytes.length;
if (qos > 0) {
remainingLength += 2;
packet += this.encodeRemainingLength(remainingLength);
packet += topicBytes;
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
this.messageId++;
} else {
packet += this.encodeRemainingLength(remainingLength);
packet += topicBytes;
}
packet += msgBytes;
this.sendPacket(packet);
}
/**
* 发送 SUBSCRIBE 包
*/
sendSubscribePacket(topic, qos) {
let packet = String.fromCharCode(0x82);
const topicBytes = this.encodeString(topic);
let remainingLength = 2 + topicBytes.length + 1; // packetId + topicBytes + qos
if (this.config.protocolVersion === 5) {
remainingLength += 1; // property length byte (0x00)
}
packet += this.encodeRemainingLength(remainingLength);
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
if (this.config.protocolVersion === 5) {
packet += String.fromCharCode(0x00); // property length = 0
}
packet += topicBytes;
packet += String.fromCharCode(qos);
this.messageId++;
this.sendPacket(packet);
}
/**
* 发送 UNSUBSCRIBE 包
*/
sendUnsubscribePacket(topic) {
let packet = String.fromCharCode(0xA2);
const topicBytes = this.encodeString(topic);
let remainingLength = 2 + topicBytes.length; // packetId + topicBytes
if (this.config.protocolVersion === 5) {
remainingLength += 1; // property length byte (0x00)
}
packet += this.encodeRemainingLength(remainingLength);
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
if (this.config.protocolVersion === 5) {
packet += String.fromCharCode(0x00); // property length = 0
}
packet += topicBytes;
this.messageId++;
this.sendPacket(packet);
}
/**
* 发送 PUBACK 包
*/
sendPubackPacket(packetId) {
try {
let packet = String.fromCharCode(0x40);
packet += String.fromCharCode(0x02);
packet += String.fromCharCode((packetId >> 8) & 0xFF, packetId & 0xFF);
this.sendPacket(packet);
} catch (error) {
this.emit('error', new Error(`发送 PUBACK 失败: ${error.message}`));
}
}
/**
* 发送 PINGREQ 包
*/
sendPingPacket() {
try {
// 清除之前的超时定时器
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
}
console.log('[MQTT调试] 发送 PINGREQ 包');
let packet = String.fromCharCode(0xC0);
packet += String.fromCharCode(0x00);
this.sendPacket(packet);
// 设置心跳响应超时定时器
const {
keepAlive
} = this.config;
if (keepAlive > 0) {
const timeout = keepAlive * 1000 * 1.5; // 1.5倍keepAlive时间
console.log('[MQTT调试] 设置心跳响应超时:', timeout, 'ms');
this.pingTimeoutTimer = setTimeout(() => {
console.log('[MQTT调试] 心跳响应超时,服务器未回复 PINGRESP');
this.emit('error', new Error('心跳响应超时,服务器未回复 PINGRESP'));
// 主动断开连接
this.disconnect();
}, timeout);
}
} catch (error) {
this.emit('error', new Error(`发送 PINGREQ 失败: ${error.message}`));
}
}
/**
* 启动心跳定时器
*/
startPingInterval() {
this.stopPingInterval();
const {
keepAlive
} = this.config;
console.log('[MQTT调试] 启动心跳定时器keepAlive:', keepAlive);
if (keepAlive > 0) {
const interval = (keepAlive * 1000) * 0.8;
console.log('[MQTT调试] 心跳间隔:', interval, 'ms');
this.pingIntervalTimer = setInterval(() => {
if (this.connectionStatus === 'connected') {
console.log('[MQTT调试] 发送心跳 PINGREQ');
this.sendPingPacket();
}
}, interval);
}
}
/**
* 停止心跳定时器
*/
stopPingInterval() {
if (this.pingIntervalTimer) {
console.log('[MQTT调试] 停止心跳定时器');
clearInterval(this.pingIntervalTimer);
this.pingIntervalTimer = null;
}
// 清除心跳响应超时定时器
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
console.log('[MQTT调试] 心跳响应超时定时器已清除');
}
}
// ========== 工具方法 ==========
/**
* 将各种数据格式转换为字节数组
*/
ab2buffer(data) {
try {
if (typeof data === 'string') {
const bytes = [];
for (let i = 0; i < data.length; i++) {
bytes.push(data.charCodeAt(i) & 0xFF);
}
return bytes;
} else if (data instanceof ArrayBuffer) {
return Array.from(new Uint8Array(data));
} else if (Array.isArray(data)) {
return data;
} else {
return Array.from(data);
}
} catch (error) {
this.emit('error', new Error(`数据转换失败: ${error.message}`));
return [];
}
}
/**
* 字节数组转字符串UTF-8 解码)
*/
bytesToString(bytes) {
try {
if (!bytes || bytes.length === 0) return '';
// 优先使用 TextDecoder 进行 UTF-8 解码
if (typeof TextDecoder !== 'undefined') {
try {
const decoder = new TextDecoder('utf-8');
const array = new Uint8Array(bytes);
return decoder.decode(array);
} catch (decodeError) {
console.warn('TextDecoder 解码失败,回退到手动解码:', decodeError);
}
}
// 手动 UTF-8 解码
let result = '';
let i = 0;
while (i < bytes.length) {
const byte = bytes[i];
// ASCII 字符 (0-127)
if (byte < 128) {
result += String.fromCharCode(byte);
i++;
}
// 2字节 UTF-8 字符
else if ((byte & 0xE0) === 0xC0 && i + 1 < bytes.length) {
const byte2 = bytes[i + 1];
if ((byte2 & 0xC0) === 0x80) {
const codePoint = ((byte & 0x1F) << 6) | (byte2 & 0x3F);
result += String.fromCharCode(codePoint);
i += 2;
} else {
// 无效的 UTF-8 序列,跳过
i++;
}
}
// 3字节 UTF-8 字符
else if ((byte & 0xF0) === 0xE0 && i + 2 < bytes.length) {
const byte2 = bytes[i + 1];
const byte3 = bytes[i + 2];
if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80) {
const codePoint = ((byte & 0x0F) << 12) | ((byte2 & 0x3F) << 6) | (byte3 & 0x3F);
result += String.fromCharCode(codePoint);
i += 3;
} else {
// 无效的 UTF-8 序列,跳过
i++;
}
}
// 4字节 UTF-8 字符(代理对)
else if ((byte & 0xF8) === 0xF0 && i + 3 < bytes.length) {
const byte2 = bytes[i + 1];
const byte3 = bytes[i + 2];
const byte4 = bytes[i + 3];
if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80 && (byte4 & 0xC0) === 0x80) {
const codePoint = ((byte & 0x07) << 18) | ((byte2 & 0x3F) << 12) | ((byte3 & 0x3F) << 6) | (
byte4 & 0x3F);
// 代理对处理
if (codePoint <= 0xFFFF) {
result += String.fromCharCode(codePoint);
} else {
// 转换为代理对
const highSurrogate = 0xD800 + (((codePoint - 0x10000) >> 10) & 0x3FF);
const lowSurrogate = 0xDC00 + ((codePoint - 0x10000) & 0x3FF);
result += String.fromCharCode(highSurrogate, lowSurrogate);
}
i += 4;
} else {
// 无效的 UTF-8 序列,跳过
i++;
}
} else {
// 无效的 UTF-8 序列,跳过
i++;
}
}
return result;
} catch (error) {
console.error('字节转字符串失败:', error);
this.emit('error', new Error(`字节转字符串失败: ${error.message}`));
return '';
}
}
/**
* 编码字符串(添加长度前缀)
*/
encodeString(str) {
let result = '';
const bytes = this.stringToBytes(str);
result += String.fromCharCode((bytes.length >> 8) & 0xFF);
result += String.fromCharCode(bytes.length & 0xFF);
for (let i = 0; i < bytes.length; i++) {
result += String.fromCharCode(bytes[i]);
}
return result;
}
/**
* 字符串转字节数组UTF-8 编码)
*/
stringToBytes(str) {
const bytes = [];
for (let i = 0; i < str.length; i++) {
let code = str.charCodeAt(i);
if (code <= 0x7F) {
// 1字节 UTF-8
bytes.push(code);
} else if (code <= 0x7FF) {
// 2字节 UTF-8
bytes.push(0xC0 | (code >> 6));
bytes.push(0x80 | (code & 0x3F));
} else if (code >= 0xD800 && code <= 0xDFFF) {
// 代理对 - 4字节 UTF-8
i++;
if (i < str.length) {
const high = code;
const low = str.charCodeAt(i);
const fullCode = ((high & 0x3FF) << 10) + (low & 0x3FF) + 0x10000;
bytes.push(0xF0 | (fullCode >> 18));
bytes.push(0x80 | ((fullCode >> 12) & 0x3F));
bytes.push(0x80 | ((fullCode >> 6) & 0x3F));
bytes.push(0x80 | (fullCode & 0x3F));
}
} else {
// 3字节 UTF-8
bytes.push(0xE0 | (code >> 12));
bytes.push(0x80 | ((code >> 6) & 0x3F));
bytes.push(0x80 | (code & 0x3F));
}
}
return bytes;
}
/**
* 编码剩余长度
*/
encodeRemainingLength(length) {
let result = '';
do {
let byte = length % 128;
length = Math.floor(length / 128);
if (length > 0) {
byte |= 0x80;
}
result += String.fromCharCode(byte);
} while (length > 0);
return result;
}
}
// 导出类
export default MQTTClient;