/** * MQTT 客户端插件 for uni-app * 基于 WebSocket 实现 MQTT 协议 */ class MQTTClient { /** * 构造函数 * @param {Object} options 配置选项 */ constructor(options = {}) { this.reSetconstructor(options); // 事件监听器 this.eventListeners = { connect: [], disconnect: [], error: [], message: [], statusChange: [] }; // 绑定方法 this.on = this.on.bind(this); this.off = this.off.bind(this); this.connect = this.connect.bind(this); this.disconnect = this.disconnect.bind(this); this.publish = this.publish.bind(this); this.subscribe = this.subscribe.bind(this); this.unsubscribe = this.unsubscribe.bind(this); } reSetconstructor(options = {}) { // 默认配置 this.config = { path: '/ws', protocolVersion: 4, // 4 for MQTT 3.1.1 clean: false, cleanStart: false, // for MQTT 5.0 keepAlive: 60, sessionExpiryInterval: 4294967295, connectTimeout: 10000, reconnectPeriod: 10000 }; // 合并用户配置 Object.assign(this.config, options); // 状态 this.socket = null; this.connectionStatus = 'disconnected'; // 'disconnected', 'connecting', 'connected', 'error' this.subscribedTopics = {}; this.messageId = 1; this.connectTimeoutTimer = null; this.reconnectTimer = null; this.pingIntervalTimer = null; this.pingTimeoutTimer = null; } /** * 添加事件监听器 * @param {string} event 事件名称: 'connect', 'disconnect', 'error', 'message', 'statusChange' * @param {Function} callback 回调函数 */ on(event, callback) { if (this.eventListeners[event]) { this.eventListeners[event].push(callback); } } /** * 移除事件监听器 * @param {string} event 事件名称 * @param {Function} callback 回调函数 */ off(event, callback) { if (this.eventListeners[event]) { const index = this.eventListeners[event].indexOf(callback); if (index > -1) { this.eventListeners[event].splice(index, 1); } } } /** * 触发事件 * @param {string} event 事件名称 * @param {...any} args 参数 */ emit(event, ...args) { if (this.eventListeners[event]) { this.eventListeners[event].forEach(callback => { try { callback(...args); } catch (err) { console.error(`Error in ${event} event listener:`, err); } }); } } /** * 更新连接状态 * @param {string} status 新状态 */ setConnectionStatus(status) { if (this.connectionStatus !== status) { this.connectionStatus = status; this.emit('statusChange', status); } } /** * 连接 MQTT 服务器 */ connect() { const { host, port, path, clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval, connectTimeout } = this.config; console.log("连接参数:", this.config) if (!host) { this.emit('error', new Error('请输入主机地址')); return; } this.setConnectionStatus('connecting'); this.emit('statusChange', 'connecting'); // 构建 WebSocket URL let urlPath = path; if (!urlPath.startsWith('/')) { urlPath = '/' + urlPath; } let url = `ws://${host}:${port}${urlPath}`; // 关闭现有连接 if (this.socket) { this.socket.close(); this.socket = null; } const ver = protocolVersion === 5 ? '5.0' : protocolVersion === 4 ? '3.1.1' : '3.1'; // 建立 WebSocket 连接 this.socket = uni.connectSocket({ url, protocols: ['mqtt', 'mqttv3.1.1'], header: { 'content-type': 'application/octet-stream' }, success: () => { // 连接请求已发送 }, fail: (err) => { this.setConnectionStatus('error'); this.emit('error', new Error(`WebSocket 连接失败: ${JSON.stringify(err)}`)); } }); // 设置连接超时 if (connectTimeout > 0) { this.connectTimeoutTimer = setTimeout(() => { if (this.connectionStatus === 'connecting') { this.setConnectionStatus('error'); this.emit('error', new Error(`连接超时 (${connectTimeout}ms)`)); if (this.socket) { this.socket.close(); this.socket = null; } } }, connectTimeout); } // WebSocket 事件监听 this.socket.onOpen(() => { console.log('[MQTT调试] WebSocket 连接已打开'); if (this.connectTimeoutTimer) { clearTimeout(this.connectTimeoutTimer); this.connectTimeoutTimer = null; } this.sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval); }); this.socket.onMessage((res) => { this.handleMQTTMessage(res.data); }); this.socket.onError((err) => { if (this.connectTimeoutTimer) { clearTimeout(this.connectTimeoutTimer); this.connectTimeoutTimer = null; } this.stopPingInterval(); this.setConnectionStatus('error'); this.emit('error', new Error(`WebSocket 错误: ${JSON.stringify(err)}`)); }); this.socket.onClose((res) => { console.log('[MQTT调试] WebSocket 关闭,原因:', res); if (this.connectTimeoutTimer) { clearTimeout(this.connectTimeoutTimer); this.connectTimeoutTimer = null; } this.stopPingInterval(); if (this.connectionStatus === 'connected' && this.config.reconnectPeriod > 0) { this.reconnectTimer = setTimeout(() => { this.connect(); }, this.config.reconnectPeriod); } this.setConnectionStatus('disconnected'); this.emit('disconnect', res); }); } /** * 断开连接 */ disconnect() { if (this.connectTimeoutTimer) { clearTimeout(this.connectTimeoutTimer); this.connectTimeoutTimer = null; } if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } // 清除心跳超时定时器 if (this.pingTimeoutTimer) { clearTimeout(this.pingTimeoutTimer); this.pingTimeoutTimer = null; } this.stopPingInterval(); if (this.socket) { this.sendDisconnectPacket(); setTimeout(() => { this.socket.close(); this.socket = null; this.setConnectionStatus('disconnected'); this.subscribedTopics = {}; }, 100); } } /** * 发布消息 * @param {string} topic 主题 * @param {string} message 消息内容 * @param {number} qos QoS 等级 (0, 1, 2) */ publish(topic, message, qos = 0) { if (this.connectionStatus !== 'connected') { this.emit('error', new Error('请先连接')); return; } if (!topic) { this.emit('error', new Error('请输入主题')); return; } try { this.sendPublishPacket(topic, message, qos); this.emit('message', { type: 'send', topic, message }); } catch (error) { this.emit('error', new Error(`发布失败: ${error.message}`)); } } /** * 订阅主题 * @param {string} topic 主题 * @param {number} qos QoS 等级 (0, 1, 2) */ subscribe(topic, qos = 0) { if (this.connectionStatus !== 'connected') { this.emit('error', new Error('请先连接')); return; } if (!topic) { this.emit('error', new Error('请输入订阅主题')); return; } try { this.sendSubscribePacket(topic, qos); this.subscribedTopics[topic] = qos; } catch (error) { this.emit('error', new Error(`订阅失败: ${error.message}`)); } } /** * 取消订阅 * @param {string} topic 主题 */ unsubscribe(topic) { if (this.connectionStatus !== 'connected') { this.emit('error', new Error('请先连接')); return; } if (!topic) { this.emit('error', new Error('请输入取消订阅的主题')); return; } try { this.sendUnsubscribePacket(topic); delete this.subscribedTopics[topic]; } catch (error) { this.emit('error', new Error(`取消订阅失败: ${error.message}`)); } } /** * 获取当前连接状态 * @returns {string} 连接状态 */ getStatus() { return this.connectionStatus; } /** * 获取已订阅的主题列表 * @returns {Array} 主题列表 */ getSubscribedTopics() { return Object.keys(this.subscribedTopics); } // ========== 内部方法 ========== /** * 发送数据包 * @param {string|ArrayBuffer|Array} packet 数据包 */ sendPacket(packet) { if (!this.socket) { return; } console.log("sendPacket", packet) let arrayBuffer; if (typeof packet === 'string') { const bytes = this.ab2buffer(packet); arrayBuffer = new Uint8Array(bytes).buffer; // 调试日志:记录发送的数据包 console.log('MQTT 发送数据包(字符串):', packet.length, '字符,字节:', bytes.length); console.log('数据包字节(前64字节):', bytes.slice(0, 64).map(b => b.toString(16).padStart(2, '0')).join(' ')); } else if (packet instanceof ArrayBuffer) { arrayBuffer = packet; console.log('MQTT 发送数据包(ArrayBuffer):', arrayBuffer.byteLength, '字节'); } else if (Array.isArray(packet)) { arrayBuffer = new Uint8Array(packet).buffer; console.log('MQTT 发送数据包(数组):', packet.length, '字节'); } else { this.emit('error', new Error('未知的数据包格式')); return; } uni.sendSocketMessage({ data: arrayBuffer, success: () => { // 发送成功 console.log('MQTT 数据包发送成功'); }, fail: (err) => { console.error('MQTT 数据包发送失败:', err); this.emit('error', new Error(`数据发送失败: ${JSON.stringify(err)}`)); } }); } /** * 处理 MQTT 消息 * @param {*} data 原始数据 */ handleMQTTMessage(data) { try { console.log('[MQTT调试] 收到原始消息:', typeof data, data instanceof ArrayBuffer, data.byteLength || data .length); const buffer = this.ab2buffer(data); console.log('[MQTT调试] 转换后的 buffer 长度:', buffer.length); if (buffer.length > 0) { const hexStr = buffer.slice(0, Math.min(32, buffer.length)).map(b => b.toString(16).padStart(2, '0')).join(' '); console.log('[MQTT调试] buffer 前32字节(hex):', hexStr); } if (buffer.length === 0) { return; } const packetType = buffer[0] & 0xF0; const flags = buffer[0] & 0x0F; console.log('[MQTT调试] 数据包类型:', '0x' + packetType.toString(16).padStart(2, '0'), '标志位:', flags); let pos = 1; let remainingLength = 0; let multiplier = 1; let encodedByte; do { if (pos >= buffer.length) { console.log('[MQTT调试] 剩余长度编码不完整'); return; } encodedByte = buffer[pos]; remainingLength += (encodedByte & 0x7F) * multiplier; multiplier *= 128; pos++; } while ((encodedByte & 0x80) !== 0 && pos < buffer.length); console.log('[MQTT调试] 剩余长度:', remainingLength, '当前位置:', pos); if (packetType === 0x20) { console.log('[MQTT调试] 处理 CONNACK 包'); this.handleConnackPacket(buffer, pos); } else if (packetType === 0x30 || packetType === 0x32) { console.log('[MQTT调试] 处理 PUBLISH 包'); this.handlePublishPacket(buffer, pos, flags); } else if (packetType === 0x40) { console.log('[MQTT调试] 处理 PUBACK 包'); this.handlePubackPacket(buffer, pos); } else if (packetType === 0x90) { console.log('[MQTT调试] 处理 SUBACK 包'); this.handleSubackPacket(buffer, pos); } else if (packetType === 0xB0) { console.log('[MQTT调试] 处理 UNSUBACK 包'); this.handleUnsubackPacket(buffer, pos); } else if (packetType === 0xD0) { console.log('[MQTT调试] 处理 PINGRESP 包'); // 清除心跳响应超时定时器 if (this.pingTimeoutTimer) { clearTimeout(this.pingTimeoutTimer); this.pingTimeoutTimer = null; console.log('[MQTT调试] 心跳响应超时定时器已清除'); } } else if (packetType === 0xE0) { console.log('[MQTT调试] 处理 DISCONNECT 包'); this.handleDisconnectPacket(buffer, pos); } else { console.log('[MQTT调试] 未知数据包类型:', packetType.toString(16)); } } catch (error) { console.error('[MQTT调试] 处理 MQTT 消息时出错:', error); this.emit('error', new Error(`处理 MQTT 消息时出错: ${error.message}`)); } } /** * 处理 CONNACK 包 */ handleConnackPacket(buffer, pos) { if (buffer.length < pos + 2) { this.emit('error', new Error('CONNACK 包长度不足')); return; } const flags = buffer[pos++]; const reasonCode = buffer[pos++]; const reasonCodes = { 0: '成功', 128: '协议版本错误', 129: '客户端ID被拒绝', 130: '服务器不可用', 131: '用户名密码错误', 132: '未授权', 133: '服务器未找到', 134: '服务器不可用', 135: '用户名密码无效' }; const reasonText = reasonCodes[reasonCode] || `未知错误 (${reasonCode})`; if (reasonCode === 0) { console.log('[MQTT调试] CONNACK 成功,设置连接状态为 connected'); this.setConnectionStatus('connected'); this.emit('connect'); this.startPingInterval(); // 重新订阅之前订阅的主题 if (Object.keys(this.subscribedTopics).length > 0) { console.log('[MQTT调试] 重新订阅之前订阅的主题'); for (const topic in this.subscribedTopics) { const qos = this.subscribedTopics[topic]; try { this.sendSubscribePacket(topic, qos); console.log(`[MQTT调试] 重新订阅主题: ${topic}, QoS: ${qos}`); } catch (err) { console.error(`[MQTT调试] 重新订阅主题失败: ${topic}`, err); } } } } else { this.setConnectionStatus('error'); this.emit('error', new Error(`MQTT 连接失败: ${reasonText}`)); } } /** * 处理 PUBLISH 包 */ handlePublishPacket(buffer, pos, flags) { try { console.log('[MQTT调试] 开始处理 PUBLISH 包,buffer 长度:', buffer.length, '起始位置:', pos, '标志位:', flags); const qos = (flags >> 1) & 0x03; console.log('[MQTT调试] 解析出的 QoS:', qos); if (buffer.length < pos + 2) { console.error('[MQTT调试] PUBLISH 包主题长度字段不完整'); this.emit('error', new Error('PUBLISH 包主题长度字段不完整')); return; } const topicLen = buffer[pos++] * 256 + buffer[pos++]; console.log('[MQTT调试] 主题长度:', topicLen); if (buffer.length < pos + topicLen) { console.error('[MQTT调试] PUBLISH 包主题数据不完整'); this.emit('error', new Error('PUBLISH 包主题数据不完整')); return; } const topicBytes = buffer.slice(pos, pos + topicLen); const topicHex = topicBytes.slice(0, Math.min(32, topicBytes.length)).map(b => b.toString(16).padStart( 2, '0')).join(' '); console.log('[MQTT调试] 主题原始字节(hex):', topicHex); const topic = this.bytesToString(topicBytes); console.log('[MQTT调试] 解析出的主题:', topic); pos += topicLen; let packetId = 0; if (qos > 0) { if (buffer.length < pos + 2) { console.error('[MQTT调试] PUBLISH 包包ID字段不完整'); this.emit('error', new Error('PUBLISH 包包ID字段不完整')); return; } packetId = buffer[pos++] * 256 + buffer[pos++]; console.log('[MQTT调试] 报文标识符:', packetId); } const payloadBytes = buffer.slice(pos); console.log('[MQTT调试] 载荷字节长度:', payloadBytes.length); if (payloadBytes.length > 0) { const payloadHex = payloadBytes.slice(0, Math.min(64, payloadBytes.length)).map(b => b.toString(16) .padStart(2, '0')).join(' '); console.log('[MQTT调试] 载荷前64字节(hex):', payloadHex); } const payload = this.bytesToString(payloadBytes); console.log('[MQTT调试] 解析出的载荷:', payload); this.emit('message', { type: 'receive', topic, payload }); if (qos === 1) { this.sendPubackPacket(packetId); } } catch (error) { console.error('[MQTT调试] 处理 PUBLISH 包时出错:', error); this.emit('error', new Error(`处理 PUBLISH 包时出错: ${error.message}`)); } } /** * 处理 PUBACK 包 */ handlePubackPacket(buffer, pos) { if (buffer.length >= pos + 2) { const packetId = buffer[pos++] * 256 + buffer[pos++]; // 可以触发事件 } } /** * 处理 SUBACK 包 */ handleSubackPacket(buffer, pos) { if (buffer.length >= pos + 2) { const packetId = buffer[pos++] * 256 + buffer[pos++]; this.emit('subscribed', packetId); } } /** * 处理 UNSUBACK 包 */ handleUnsubackPacket(buffer, pos) { if (buffer.length >= pos + 2) { const packetId = buffer[pos++] * 256 + buffer[pos++]; this.emit('unsubscribed', packetId); } } /** * 处理 DISCONNECT 包 */ handleDisconnectPacket(buffer, pos) { this.setConnectionStatus('disconnected'); this.emit('disconnect', { type: 'server' }); } /** * 发送 CONNECT 包 */ sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval) { try { console.log('[MQTT调试] 发送 CONNECT 包,clean:', clean, 'cleanStart:', cleanStart, 'protocolVersion:', protocolVersion); let packet = ''; packet += String.fromCharCode(0x10); let variableHeader = ''; variableHeader += String.fromCharCode(0x00, 0x04); variableHeader += 'MQTT'; variableHeader += String.fromCharCode(protocolVersion); let flags = 0x00; if (protocolVersion === 5) { if (cleanStart) flags |= 0x02; } else { if (clean) flags |= 0x02; } if (username) flags |= 0x80; if (password) flags |= 0x40; variableHeader += String.fromCharCode(flags); variableHeader += String.fromCharCode((keepAlive >> 8) & 0xFF, keepAlive & 0xFF); // MQTT 5.0 属性处理 if (protocolVersion === 5) { let properties = ''; let propertyLength = 0; // 添加会话过期间隔属性 (0x11) if (!cleanStart && sessionExpiryInterval !== undefined) { properties += String.fromCharCode(0x11); // 属性标识符 properties += String.fromCharCode((sessionExpiryInterval >> 24) & 0xFF); properties += String.fromCharCode((sessionExpiryInterval >> 16) & 0xFF); properties += String.fromCharCode((sessionExpiryInterval >> 8) & 0xFF); properties += String.fromCharCode(sessionExpiryInterval & 0xFF); propertyLength += 1 + 4; // 标识符 + 4字节值 } // 编码属性长度为可变字节整数 let encodedPropertyLength = ''; let propLen = propertyLength; do { let byte = propLen % 128; propLen = Math.floor(propLen / 128); if (propLen > 0) { byte |= 0x80; } encodedPropertyLength += String.fromCharCode(byte); } while (propLen > 0); variableHeader += encodedPropertyLength; variableHeader += properties; } let payload = this.encodeString(clientId); if (username) payload += this.encodeString(username); if (password) payload += this.encodeString(password); const remainingLength = variableHeader.length + payload.length; packet += this.encodeRemainingLength(remainingLength); packet += variableHeader; packet += payload; this.sendPacket(packet); } catch (error) { this.emit('error', new Error(`发送 CONNECT 包失败: ${error.message}`)); } } /** * 发送 DISCONNECT 包 */ sendDisconnectPacket() { let packet = String.fromCharCode(0xE0); packet += String.fromCharCode(0x00); this.sendPacket(packet); } /** * 发送 PUBLISH 包 */ sendPublishPacket(topic, message, qos) { let packet = ''; const flags = 0x30 | (qos << 1); packet += String.fromCharCode(flags); const topicBytes = this.encodeString(topic); const msgBytes = this.stringToBytes(message); let remainingLength = topicBytes.length + msgBytes.length; if (qos > 0) { remainingLength += 2; packet += this.encodeRemainingLength(remainingLength); packet += topicBytes; packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); this.messageId++; } else { packet += this.encodeRemainingLength(remainingLength); packet += topicBytes; } packet += msgBytes; this.sendPacket(packet); } /** * 发送 SUBSCRIBE 包 */ sendSubscribePacket(topic, qos) { let packet = String.fromCharCode(0x82); const topicBytes = this.encodeString(topic); let remainingLength = 2 + topicBytes.length + 1; // packetId + topicBytes + qos if (this.config.protocolVersion === 5) { remainingLength += 1; // property length byte (0x00) } packet += this.encodeRemainingLength(remainingLength); packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); if (this.config.protocolVersion === 5) { packet += String.fromCharCode(0x00); // property length = 0 } packet += topicBytes; packet += String.fromCharCode(qos); this.messageId++; this.sendPacket(packet); } /** * 发送 UNSUBSCRIBE 包 */ sendUnsubscribePacket(topic) { let packet = String.fromCharCode(0xA2); const topicBytes = this.encodeString(topic); let remainingLength = 2 + topicBytes.length; // packetId + topicBytes if (this.config.protocolVersion === 5) { remainingLength += 1; // property length byte (0x00) } packet += this.encodeRemainingLength(remainingLength); packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); if (this.config.protocolVersion === 5) { packet += String.fromCharCode(0x00); // property length = 0 } packet += topicBytes; this.messageId++; this.sendPacket(packet); } /** * 发送 PUBACK 包 */ sendPubackPacket(packetId) { try { let packet = String.fromCharCode(0x40); packet += String.fromCharCode(0x02); packet += String.fromCharCode((packetId >> 8) & 0xFF, packetId & 0xFF); this.sendPacket(packet); } catch (error) { this.emit('error', new Error(`发送 PUBACK 失败: ${error.message}`)); } } /** * 发送 PINGREQ 包 */ sendPingPacket() { try { // 清除之前的超时定时器 if (this.pingTimeoutTimer) { clearTimeout(this.pingTimeoutTimer); this.pingTimeoutTimer = null; } console.log('[MQTT调试] 发送 PINGREQ 包'); let packet = String.fromCharCode(0xC0); packet += String.fromCharCode(0x00); this.sendPacket(packet); // 设置心跳响应超时定时器 const { keepAlive } = this.config; if (keepAlive > 0) { const timeout = keepAlive * 1000 * 1.5; // 1.5倍keepAlive时间 console.log('[MQTT调试] 设置心跳响应超时:', timeout, 'ms'); this.pingTimeoutTimer = setTimeout(() => { console.log('[MQTT调试] 心跳响应超时,服务器未回复 PINGRESP'); this.emit('error', new Error('心跳响应超时,服务器未回复 PINGRESP')); // 主动断开连接 this.disconnect(); }, timeout); } } catch (error) { this.emit('error', new Error(`发送 PINGREQ 失败: ${error.message}`)); } } /** * 启动心跳定时器 */ startPingInterval() { this.stopPingInterval(); const { keepAlive } = this.config; console.log('[MQTT调试] 启动心跳定时器,keepAlive:', keepAlive); if (keepAlive > 0) { const interval = (keepAlive * 1000) * 0.8; console.log('[MQTT调试] 心跳间隔:', interval, 'ms'); this.pingIntervalTimer = setInterval(() => { if (this.connectionStatus === 'connected') { console.log('[MQTT调试] 发送心跳 PINGREQ'); this.sendPingPacket(); } }, interval); } } /** * 停止心跳定时器 */ stopPingInterval() { if (this.pingIntervalTimer) { console.log('[MQTT调试] 停止心跳定时器'); clearInterval(this.pingIntervalTimer); this.pingIntervalTimer = null; } // 清除心跳响应超时定时器 if (this.pingTimeoutTimer) { clearTimeout(this.pingTimeoutTimer); this.pingTimeoutTimer = null; console.log('[MQTT调试] 心跳响应超时定时器已清除'); } } // ========== 工具方法 ========== /** * 将各种数据格式转换为字节数组 */ ab2buffer(data) { try { if (typeof data === 'string') { const bytes = []; for (let i = 0; i < data.length; i++) { bytes.push(data.charCodeAt(i) & 0xFF); } return bytes; } else if (data instanceof ArrayBuffer) { return Array.from(new Uint8Array(data)); } else if (Array.isArray(data)) { return data; } else { return Array.from(data); } } catch (error) { this.emit('error', new Error(`数据转换失败: ${error.message}`)); return []; } } /** * 字节数组转字符串(UTF-8 解码) */ bytesToString(bytes) { try { if (!bytes || bytes.length === 0) return ''; // 优先使用 TextDecoder 进行 UTF-8 解码 if (typeof TextDecoder !== 'undefined') { try { const decoder = new TextDecoder('utf-8'); const array = new Uint8Array(bytes); return decoder.decode(array); } catch (decodeError) { console.warn('TextDecoder 解码失败,回退到手动解码:', decodeError); } } // 手动 UTF-8 解码 let result = ''; let i = 0; while (i < bytes.length) { const byte = bytes[i]; // ASCII 字符 (0-127) if (byte < 128) { result += String.fromCharCode(byte); i++; } // 2字节 UTF-8 字符 else if ((byte & 0xE0) === 0xC0 && i + 1 < bytes.length) { const byte2 = bytes[i + 1]; if ((byte2 & 0xC0) === 0x80) { const codePoint = ((byte & 0x1F) << 6) | (byte2 & 0x3F); result += String.fromCharCode(codePoint); i += 2; } else { // 无效的 UTF-8 序列,跳过 i++; } } // 3字节 UTF-8 字符 else if ((byte & 0xF0) === 0xE0 && i + 2 < bytes.length) { const byte2 = bytes[i + 1]; const byte3 = bytes[i + 2]; if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80) { const codePoint = ((byte & 0x0F) << 12) | ((byte2 & 0x3F) << 6) | (byte3 & 0x3F); result += String.fromCharCode(codePoint); i += 3; } else { // 无效的 UTF-8 序列,跳过 i++; } } // 4字节 UTF-8 字符(代理对) else if ((byte & 0xF8) === 0xF0 && i + 3 < bytes.length) { const byte2 = bytes[i + 1]; const byte3 = bytes[i + 2]; const byte4 = bytes[i + 3]; if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80 && (byte4 & 0xC0) === 0x80) { const codePoint = ((byte & 0x07) << 18) | ((byte2 & 0x3F) << 12) | ((byte3 & 0x3F) << 6) | ( byte4 & 0x3F); // 代理对处理 if (codePoint <= 0xFFFF) { result += String.fromCharCode(codePoint); } else { // 转换为代理对 const highSurrogate = 0xD800 + (((codePoint - 0x10000) >> 10) & 0x3FF); const lowSurrogate = 0xDC00 + ((codePoint - 0x10000) & 0x3FF); result += String.fromCharCode(highSurrogate, lowSurrogate); } i += 4; } else { // 无效的 UTF-8 序列,跳过 i++; } } else { // 无效的 UTF-8 序列,跳过 i++; } } return result; } catch (error) { console.error('字节转字符串失败:', error); this.emit('error', new Error(`字节转字符串失败: ${error.message}`)); return ''; } } /** * 编码字符串(添加长度前缀) */ encodeString(str) { let result = ''; const bytes = this.stringToBytes(str); result += String.fromCharCode((bytes.length >> 8) & 0xFF); result += String.fromCharCode(bytes.length & 0xFF); for (let i = 0; i < bytes.length; i++) { result += String.fromCharCode(bytes[i]); } return result; } /** * 字符串转字节数组(UTF-8 编码) */ stringToBytes(str) { const bytes = []; for (let i = 0; i < str.length; i++) { let code = str.charCodeAt(i); if (code <= 0x7F) { // 1字节 UTF-8 bytes.push(code); } else if (code <= 0x7FF) { // 2字节 UTF-8 bytes.push(0xC0 | (code >> 6)); bytes.push(0x80 | (code & 0x3F)); } else if (code >= 0xD800 && code <= 0xDFFF) { // 代理对 - 4字节 UTF-8 i++; if (i < str.length) { const high = code; const low = str.charCodeAt(i); const fullCode = ((high & 0x3FF) << 10) + (low & 0x3FF) + 0x10000; bytes.push(0xF0 | (fullCode >> 18)); bytes.push(0x80 | ((fullCode >> 12) & 0x3F)); bytes.push(0x80 | ((fullCode >> 6) & 0x3F)); bytes.push(0x80 | (fullCode & 0x3F)); } } else { // 3字节 UTF-8 bytes.push(0xE0 | (code >> 12)); bytes.push(0x80 | ((code >> 6) & 0x3F)); bytes.push(0x80 | (code & 0x3F)); } } return bytes; } /** * 编码剩余长度 */ encodeRemainingLength(length) { let result = ''; do { let byte = length % 128; length = Math.floor(length / 128); if (length > 0) { byte |= 0x80; } result += String.fromCharCode(byte); } while (length > 0); return result; } } // 导出类 export default MQTTClient;