diff --git a/pages/home-page/policy-Config/index.vue b/pages/home-page/policy-Config/index.vue index eac4c0a..4e62d40 100644 --- a/pages/home-page/policy-Config/index.vue +++ b/pages/home-page/policy-Config/index.vue @@ -42,9 +42,6 @@ {{ $t("homePage.mine.submit") }} - - - @@ -55,13 +52,9 @@ import { Langlist } from "@/common/lang"; - import pako from "pako"; - import mqttClient from '@/uni_modules/yh-wsmqtt/wsmqtt/components/mqtt-client.vue' - import * as Paho from "@/static/lib/mqtt.js"; + import MQTTClient from "@/static/lib/mqtt.js"; export default { - components: { - mqttClient - }, + data() { return { formList: formList, @@ -72,47 +65,156 @@ mqttClient: null, backData: {}, smallArr: [], - mqttOptions: { + mqttConfig: { host: '13.39.200.14', port: 8083, - path: '/mqtt' - } + clientId: 'mqttx_' + Date.now(), + username: 'admin', + password: 'zzkj@688737', + path: 'mqtt', + protocolVersion: 4, // 4 for MQTT 3.1.1 + clean: false, + cleanStart: false, // for MQTT 5.0 + keepAlive: 60, + sessionExpiryInterval: 4294967295, + connectTimeout: 10000, + reconnectPeriod: 3000 + + }, + }; }, computed: { language() { return this.$store.state.vuex_language - } - }, - watch: { - language: { - handler(val) { - // console.log(val) - // this.lang.value = Langlist.find(v => v.prop == val) - // console.log(this.lang) - }, - immediate: true, - deep: true, + }, + currentStation() { + console.log(this.vuex_currentStation) + return this.vuex_currentStation; }, }, + onShow() { - // if (!this.mqttClient) { - // this.initMQTT() - // } - }, - beforeDestroy() { - // if (this.mqttClient) { - // this.mqttClient.disconnect() - // this.mqttClient = null - // } + this.connect() }, + methods: { - onConnect() { - console.log('连接成功') + initMQTTClient() { + this.mqttClient = new MQTTClient(this.mqttConfig) + console.log(this.mqttClient); + // 监听连接事件 + this.mqttClient.on('connect', () => { + console.log('MQTT 连接成功') + this.publish({ + topic: '/test', + message: { + fun: 'GET', + type: 'WJ_Get_NewControlSystem', + content: 0 + }, + qos: 0 + }) + }) + + // 监听断开事件 + this.mqttClient.on('disconnect', (res) => { + this.connectionStatus = 'disconnected' + console.log('连接已关闭') + }) + + // 监听错误事件 + this.mqttClient.on('error', (error) => { + console.log(error) + }) + + // 监听状态变化 + this.mqttClient.on('statusChange', (status) => { + console.log(status) + }) + + // 监听消息事件 + this.mqttClient.on('message', (msg) => { + console.log(msg) + }) + + // 监听订阅成功事件 + this.mqttClient.on('subscribed', (packetId) => { + this.addLog('success', `订阅成功 (包ID: ${packetId})`) + uni.showToast({ + title: '订阅成功', + icon: 'success' + }) + }) + + // 监听取消订阅成功事件 + this.mqttClient.on('unsubscribed', (packetId) => { + this.addLog('success', `取消订阅成功 (包ID: ${packetId})`) + uni.showToast({ + title: '取消订阅成功', + icon: 'success' + }) + }) }, - onMessage(msg) { - console.log('收到消息:', msg) + + + publish(publishConfig) { + const { + topic, + message, + qos + } = publishConfig + + if (!topic) { + uni.showToast({ + title: '请输入主题', + icon: 'none' + }) + return + } + + if (this.mqttClient) { + this.mqttClient.publish(topic, message, qos) + } }, + + subscribe() { + + if (this.mqttClient) { + this.mqttClient.subscribe({ + topic: '/test', + qos: 0 + }) + } + }, + + connect() { + const { + host + } = this.mqttConfig + + if (!host) { + uni.showToast({ + title: '请输入主机地址', + icon: 'none' + }) + return + } + + console.log('开始连接 MQTT 服务器...') + + + // 如果客户端未初始化,重新初始化 + if (!this.mqttClient) { + this.initMQTTClient() + } + this.mqttClient.reSetconstructor(this.mqttConfig) + + this.mqttClient.connect() + }, + + + + decodeGzipBase64(base64Str) { // 1. base64 → Uint8Array const binaryStr = atob(base64Str); @@ -186,77 +288,7 @@ console.log('handleInput方法错误:', error); } }, - initMQTT() { - // 创建MQTT客户端实例 - const clientId = 'client_' + Math.random().toString(36).substr(2, 9); - // 使用WebSocket连接方式 - this.mqttClient = new Paho.Client('13.39.200.14', 8083, clientId); - // 设置回调函数 - this.mqttClient.onConnectionLost = this.onConnectionLost; - this.mqttClient.onMessageArrived = this.onMessageArrived; - - // 连接选项 - const options = { - onSuccess: this.onConnect, - onFailure: this.onFail, - userName: 'admin', - password: 'zzkj@688737', - useSSL: false, - timeout: 30, - keepAliveInterval: 60, - cleanSession: true - }; - - // 连接到MQTT broker - this.mqttClient.connect(options); - }, - onConnect() { - console.log('Connected to MQTT broker!'); - // 订阅主题 - this.mqttClient.subscribe('WJ_Get_NewControlSystem'); - // 发送获取数据的消息 - this.sendMQTTMessage('WJ_Get_NewControlSystem', JSON.stringify({ - fun: 'GET', - type: 'WJ_Get_NewControlSystem', - content: 0 - })); - }, - onFail(error) { - console.error('Failed to connect:', error); - }, - onConnectionLost(responseObject) { - if (responseObject.errorCode !== 0) { - console.log('Connection lost: ' + responseObject.errorMessage); - } - }, - onMessageArrived(message) { - console.log('Message arrived:', message.payloadString); - try { - // let data = JSON.parse(message.payloadString); - // if (data.type == 'WJ_Get_NewControlSystem') { - // this.control(data.content); - // } - } catch (error) { - console.log('解析MQTT消息错误:', error); - } - }, - sendMQTTMessage(topic, payload) { - if (this.mqttClient && this.mqttClient.isConnected()) { - const message = new Paho.Message(payload); - message.destinationName = topic; - this.mqttClient.send(message); - console.log('MQTT消息发送成功:', payload); - } else { - console.log('MQTT客户端未连接'); - if (!this.mqttClient) { - this.initMQTT(); - } - setTimeout(() => { - this.sendMQTTMessage(topic, payload); - }, 1000); - } - }, toback() { uni.navigateBack() }, @@ -269,17 +301,22 @@ pre[cur.prop] = Number(pre[cur.prop]) return pre }, {}) - this.sendMQTTMessage("WJ_Set_NewControlSystem", JSON.stringify({ - fun: 'SET', - type: 'WJ_Set_NewControlSystem', - content: { - ...this.backData, - params: { - ...this.backData.params, - ...smallParam + this.publish({ + topic: '/test', + message: { + fun: 'SET', + type: 'WJ_Set_NewControlSystem', + content: { + ...this.backData, + params: { + ...this.backData.params, + ...smallParam + } } - } - })) + }, + qos: 0 + }) + }, }, }; diff --git a/static/lib/mqtt.js b/static/lib/mqtt.js index e53be67..fbb68d3 100644 --- a/static/lib/mqtt.js +++ b/static/lib/mqtt.js @@ -1,75 +1,1065 @@ -/******************************************************************************* - * Copyright (c) 2013, 2016 IBM Corp. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Eclipse Distribution License v1.0 which accompany this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * and the Eclipse Distribution License is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - *******************************************************************************/ -(function(t,q){"object"===typeof exports&&"object"===typeof module?module.exports=q():"function"===typeof define&&define.amd?define(q):"object"===typeof exports?exports=q():t.Paho=q()})(this,function(){return function(t){function q(a,b,c){b[c++]=a>>8;b[c++]=a%256;return c}function r(a,b,c,k){k=q(b,c,k);E(a,c,k);return k+b}function n(a){for(var b=0,c=0;c=k&&(c++,b++),b+=3):127=e){var g=a.charCodeAt(++k);if(isNaN(g))throw Error(f(h.MALFORMED_UNICODE,[e,g]));e=(e-55296<<10)+(g-56320)+65536}127>=e?b[c++]=e:(2047>=e?b[c++]=e>>6&31|192:(65535>=e?b[c++]=e>>12&15|224:(b[c++]=e>>18&7|240,b[c++]=e>>12&63|128),b[c++]=e>>6&63|128),b[c++]=e&63|128)}return b}function F(a,b,c){for(var k="",e,g=b;ge)){var m=a[g++]-128;if(0>m)throw Error(f(h.MALFORMED_UTF,[e.toString(16),m.toString(16),""]));if(224>e)e=64*(e-192)+m;else{var d= -a[g++]-128;if(0>d)throw Error(f(h.MALFORMED_UTF,[e.toString(16),m.toString(16),d.toString(16)]));if(240>e)e=4096*(e-224)+64*m+d;else{var l=a[g++]-128;if(0>l)throw Error(f(h.MALFORMED_UTF,[e.toString(16),m.toString(16),d.toString(16),l.toString(16)]));if(248>e)e=262144*(e-240)+4096*m+64*d+l;else throw Error(f(h.MALFORMED_UTF,[e.toString(16),m.toString(16),d.toString(16),l.toString(16)]));}}}65535>10)),e=56320+(e&1023));k+=String.fromCharCode(e)}return k} -var s=t.localStorage||function(){var a={};return{setItem:function(b,c){a[b]=c},getItem:function(b){return a[b]},removeItem:function(b){delete a[b]}}}(),A=function(a,b){for(var c in a)if(a.hasOwnProperty(c))if(b.hasOwnProperty(c)){if(typeof a[c]!==b[c])throw Error(f(h.INVALID_TYPE,[typeof a[c],c]));}else{c="Unknown property, "+c+". Valid properties are:";for(var k in b)b.hasOwnProperty(k)&&(c=c+" "+k);throw Error(c);}},u=function(a,b){return function(){return a.apply(b,arguments)}},h={OK:{code:0,text:"AMQJSC0000I OK."}, -CONNECT_TIMEOUT:{code:1,text:"AMQJSC0001E Connect timed out."},SUBSCRIBE_TIMEOUT:{code:2,text:"AMQJS0002E Subscribe timed out."},UNSUBSCRIBE_TIMEOUT:{code:3,text:"AMQJS0003E Unsubscribe timed out."},PING_TIMEOUT:{code:4,text:"AMQJS0004E Ping timed out."},INTERNAL_ERROR:{code:5,text:"AMQJS0005E Internal error. Error Message: {0}, Stack trace: {1}"},CONNACK_RETURNCODE:{code:6,text:"AMQJS0006E Bad Connack return code:{0} {1}."},SOCKET_ERROR:{code:7,text:"AMQJS0007E Socket error:{0}."},SOCKET_CLOSE:{code:8, -text:"AMQJS0008I Socket closed."},MALFORMED_UTF:{code:9,text:"AMQJS0009E Malformed UTF data:{0} {1} {2}."},UNSUPPORTED:{code:10,text:"AMQJS0010E {0} is not supported by this browser."},INVALID_STATE:{code:11,text:"AMQJS0011E Invalid state {0}."},INVALID_TYPE:{code:12,text:"AMQJS0012E Invalid type {0} for {1}."},INVALID_ARGUMENT:{code:13,text:"AMQJS0013E Invalid argument {0} for {1}."},UNSUPPORTED_OPERATION:{code:14,text:"AMQJS0014E Unsupported operation."},INVALID_STORED_DATA:{code:15,text:"AMQJS0015E Invalid data in local storage key\x3d{0} value\x3d{1}."}, -INVALID_MQTT_MESSAGE_TYPE:{code:16,text:"AMQJS0016E Invalid MQTT message type {0}."},MALFORMED_UNICODE:{code:17,text:"AMQJS0017E Malformed Unicode string:{0} {1}."},BUFFER_FULL:{code:18,text:"AMQJS0018E Message buffer is full, maximum buffer size: {0}."}},H={0:"Connection Accepted",1:"Connection Refused: unacceptable protocol version",2:"Connection Refused: identifier rejected",3:"Connection Refused: server unavailable",4:"Connection Refused: bad user name or password",5:"Connection Refused: not authorized"}, -f=function(a,b){var c=a.text;if(b)for(var k,e,g=0;g>7;0d);f=g.length+1;b=new ArrayBuffer(b+f);d=new Uint8Array(b);d[0]=a;d.set(g,1);if(3==this.type)f=r(this.payloadMessage.destinationName, -k,d,f);else if(1==this.type){switch(this.mqttVersion){case 3:d.set(B,f);f+=B.length;break;case 4:d.set(C,f),f+=C.length}a=0;this.cleanSession&&(a=2);void 0!==this.willMessage&&(a=a|4|this.willMessage.qos<<3,this.willMessage.retained&&(a|=32));void 0!==this.userName&&(a|=128);void 0!==this.password&&(a|=64);d[f++]=a;f=q(this.keepAliveInterval,d,f)}void 0!==this.messageIdentifier&&(f=q(this.messageIdentifier,d,f));switch(this.type){case 1:f=r(this.clientId,n(this.clientId),d,f);void 0!==this.willMessage&& -(f=r(this.willMessage.destinationName,n(this.willMessage.destinationName),d,f),f=q(e.byteLength,d,f),d.set(e,f),f+=e.byteLength);void 0!==this.userName&&(f=r(this.userName,n(this.userName),d,f));void 0!==this.password&&r(this.password,n(this.password),d,f);break;case 3:d.set(h,f);break;case 8:for(g=0;gthis.disconnectedBufferSize)throw Error(f(h.BUFFER_FULL,[this.disconnectedBufferSize]));0this.connectOptions.mqttVersion?new WebSocket(a,["mqttv3.1"]):new WebSocket(a,["mqtt"]);this.socket.binaryType="arraybuffer";this.socket.onopen=u(this._on_socket_open,this);this.socket.onmessage=u(this._on_socket_message,this);this.socket.onerror=u(this._on_socket_error,this);this.socket.onclose=u(this._on_socket_close, -this);this.sendPinger=new G(this,this.connectOptions.keepAliveInterval);this.receivePinger=new G(this,this.connectOptions.keepAliveInterval);this._connectTimeout&&(this._connectTimeout.cancel(),this._connectTimeout=null);this._connectTimeout=new v(this,this.connectOptions.timeout,this._disconnected,[h.CONNECT_TIMEOUT.code,f(h.CONNECT_TIMEOUT)])};d.prototype._schedule_message=function(a){this._msg_queue.unshift(a);this.connected&&this._process_queue()};d.prototype.store=function(a,b){var c={type:b.type, -messageIdentifier:b.messageIdentifier,version:1};switch(b.type){case 3:b.pubRecReceived&&(c.pubRecReceived=!0);c.payloadMessage={};for(var d="",e=b.payloadMessage.payloadBytes,g=0;g=e[g]?d+"0"+e[g].toString(16):d+e[g].toString(16);c.payloadMessage.payloadHex=d;c.payloadMessage.qos=b.payloadMessage.qos;c.payloadMessage.destinationName=b.payloadMessage.destinationName;b.payloadMessage.duplicate&&(c.payloadMessage.duplicate=!0);b.payloadMessage.retained&&(c.payloadMessage.retained= -!0);0===a.indexOf("Sent:")&&(void 0===b.sequence&&(b.sequence=++this._sequence),c.sequence=b.sequence);break;default:throw Error(f(h.INVALID_STORED_DATA,[a+this._localKey+b.messageIdentifier,c]));}s.setItem(a+this._localKey+b.messageIdentifier,JSON.stringify(c))};d.prototype.restore=function(a){var b=s.getItem(a),c=JSON.parse(b),d=new p(c.type,c);switch(c.type){case 3:for(var b=c.payloadMessage.payloadHex,e=new ArrayBuffer(b.length/2),e=new Uint8Array(e),g=0;2<=b.length;){var m=parseInt(b.substring(0, -2),16),b=b.substring(2,b.length);e[g++]=m}b=new w(e);b.qos=c.payloadMessage.qos;b.destinationName=c.payloadMessage.destinationName;c.payloadMessage.duplicate&&(b.duplicate=!0);c.payloadMessage.retained&&(b.retained=!0);d.payloadMessage=b;break;default:throw Error(f(h.INVALID_STORED_DATA,[a,b]));}0===a.indexOf("Sent:"+this._localKey)?(d.payloadMessage.duplicate=!0,this._sentMessages[d.messageIdentifier]=d):0===a.indexOf("Received:"+this._localKey)&&(this._receivedMessages[d.messageIdentifier]=d)}; -d.prototype._process_queue=function(){for(var a=null;a=this._msg_queue.pop();)this._socket_send(a),this._notify_msg_sent[a]&&(this._notify_msg_sent[a](),delete this._notify_msg_sent[a])};d.prototype._requires_ack=function(a){var b=Object.keys(this._sentMessages).length;if(b>this.maxMessageIdentifier)throw Error("Too many messages:"+b);for(;void 0!==this._sentMessages[this._message_identifier];)this._message_identifier++;a.messageIdentifier=this._message_identifier;this._sentMessages[a.messageIdentifier]= -a;3===a.type&&this.store("Sent:",a);this._message_identifier===this.maxMessageIdentifier&&(this._message_identifier=1)};d.prototype._on_socket_open=function(){var a=new p(1,this.connectOptions);a.clientId=this.clientId;this._socket_send(a)};d.prototype._on_socket_message=function(a){this._trace("Client._on_socket_message",a.data);a=this._deframeMessages(a.data);for(var b=0;b>4,z=n&15,g=g+1,x=void 0,D=0,q=1;do{if(g==e.length){d=[null,m];break a}x=e[g++];D+=(x&127)*q;q*=128}while(0!==(x&128));x=g+D;if(x>e.length)d=[null,m];else{var y=new p(l);switch(l){case 2:e[g++]&1&&(y.sessionPresent=!0);y.returnCode=e[g++];break;case 3:var m=z>>1&3,s=256*e[g]+e[g+1],g=g+2,t=F(e,g,s), -g=g+s;0this._reconnectInterval&&(this._reconnectInterval*=2),this.connectOptions.uris?(this.hostIndex=0,this._doConnect(this.connectOptions.uris[0])):this._doConnect(this.uri))};d.prototype._disconnected=function(a,b){this._trace("Client._disconnected",a,b);if(void 0!==a&&this._reconnecting)this._reconnectTimeout=new v(this,this._reconnectInterval,this._reconnect);else if(this.sendPinger.cancel(),this.receivePinger.cancel(),this._connectTimeout&& -(this._connectTimeout.cancel(),this._connectTimeout=null),this._msg_queue=[],this._buffered_msg_queue=[],this._notify_msg_sent={},this.socket&&(this.socket.onopen=null,this.socket.onmessage=null,this.socket.onerror=null,this.socket.onclose=null,1===this.socket.readyState&&this.socket.close(),delete this.socket),this.connectOptions.uris&&this.hostIndexb)throw Error(f(h.INVALID_TYPE,[typeof b,"port"]));if("string"!==typeof c)throw Error(f(h.INVALID_TYPE,[typeof c,"path"]));e="ws://"+(-1!==a.indexOf(":")&&"["!==a.slice(0,1)&&"]"!==a.slice(-1)?"["+a+"]":a)+":"+b+c}for(var m=g=0;m=n&&m++;g++}if("string"!==typeof k||65535a.mqttVersion)throw Error(f(h.INVALID_ARGUMENT,[a.mqttVersion,"connectOptions.mqttVersion"]));void 0===a.mqttVersion?(a.mqttVersionExplicit=!1,a.mqttVersion=4):a.mqttVersionExplicit=!0;if(void 0!==a.password&&void 0===a.userName)throw Error(f(h.INVALID_ARGUMENT,[a.password,"connectOptions.password"]));if(a.willMessage){if(!(a.willMessage instanceof -w))throw Error(f(h.INVALID_TYPE,[a.willMessage,"connectOptions.willMessage"]));a.willMessage.stringPayload=null;if("undefined"===typeof a.willMessage.destinationName)throw Error(f(h.INVALID_TYPE,[typeof a.willMessage.destinationName,"connectOptions.willMessage.destinationName"]));}"undefined"===typeof a.cleanSession&&(a.cleanSession=!0);if(a.hosts){if(!(a.hosts instanceof Array))throw Error(f(h.INVALID_ARGUMENT,[a.hosts,"connectOptions.hosts"]));if(1>a.hosts.length)throw Error(f(h.INVALID_ARGUMENT, -[a.hosts,"connectOptions.hosts"]));for(var b=!1,d=0;da.ports[d])throw Error(f(h.INVALID_TYPE,[typeof a.ports[d],"connectOptions.ports["+d+"]"]));var b=a.hosts[d],g=a.ports[d];e="ws://"+(-1!==b.indexOf(":")?"["+b+"]":b)+":"+g+c;a.uris.push(e)}}}l.connect(a)}; -this.subscribe=function(a,b){if("string"!==typeof a&&a.constructor!==Array)throw Error("Invalid argument:"+a);b=b||{};A(b,{qos:"number",invocationContext:"object",onSuccess:"function",onFailure:"function",timeout:"number"});if(b.timeout&&!b.onFailure)throw Error("subscribeOptions.timeout specified with no onFailure callback.");if("undefined"!==typeof b.qos&&0!==b.qos&&1!==b.qos&&2!==b.qos)throw Error(f(h.INVALID_ARGUMENT,[b.qos,"subscribeOptions.qos"]));l.subscribe(a,b)};this.unsubscribe=function(a, -b){if("string"!==typeof a&&a.constructor!==Array)throw Error("Invalid argument:"+a);b=b||{};A(b,{invocationContext:"object",onSuccess:"function",onFailure:"function",timeout:"number"});if(b.timeout&&!b.onFailure)throw Error("unsubscribeOptions.timeout specified with no onFailure callback.");l.unsubscribe(a,b)};this.send=function(a,b,c,d){var e;if(0===arguments.length)throw Error("Invalid argument.length");if(1==arguments.length){if(!(a instanceof w)&&"string"!==typeof a)throw Error("Invalid argument:"+ -typeof a);e=a;if("undefined"===typeof e.destinationName)throw Error(f(h.INVALID_ARGUMENT,[e.destinationName,"Message.destinationName"]));}else e=new w(b),e.destinationName=a,3<=arguments.length&&(e.qos=c),4<=arguments.length&&(e.retained=d);l.send(e)};this.publish=function(a,b,c,d){var e;if(0===arguments.length)throw Error("Invalid argument.length");if(1==arguments.length){if(!(a instanceof w)&&"string"!==typeof a)throw Error("Invalid argument:"+typeof a);e=a;if("undefined"===typeof e.destinationName)throw Error(f(h.INVALID_ARGUMENT, -[e.destinationName,"Message.destinationName"]));}else e=new w(b),e.destinationName=a,3<=arguments.length&&(e.qos=c),4<=arguments.length&&(e.retained=d);l.send(e)};this.disconnect=function(){l.disconnect()};this.getTraceLog=function(){return l.getTraceLog()};this.startTrace=function(){l.startTrace()};this.stopTrace=function(){l.stopTrace()};this.isConnected=function(){return l.connected}},Message:w}}("undefined"!==typeof global?global:"undefined"!==typeof self?self:"undefined"!==typeof window?window: -{})}); \ No newline at end of file +/** + * MQTT 客户端插件 for uni-app + * 基于 WebSocket 实现 MQTT 协议 + */ +class MQTTClient { + /** + * 构造函数 + * @param {Object} options 配置选项 + */ + constructor(options = {}) { + + this.reSetconstructor(options); + // 事件监听器 + this.eventListeners = { + connect: [], + disconnect: [], + error: [], + message: [], + statusChange: [] + }; + + // 绑定方法 + this.on = this.on.bind(this); + this.off = this.off.bind(this); + this.connect = this.connect.bind(this); + this.disconnect = this.disconnect.bind(this); + this.publish = this.publish.bind(this); + this.subscribe = this.subscribe.bind(this); + this.unsubscribe = this.unsubscribe.bind(this); + } + + reSetconstructor(options = {}) { + // 默认配置 + this.config = { + path: '/ws', + protocolVersion: 4, // 4 for MQTT 3.1.1 + clean: false, + cleanStart: false, // for MQTT 5.0 + keepAlive: 60, + sessionExpiryInterval: 4294967295, + connectTimeout: 10000, + reconnectPeriod: 10000 + }; + + // 合并用户配置 + Object.assign(this.config, options); + + // 状态 + this.socket = null; + this.connectionStatus = 'disconnected'; // 'disconnected', 'connecting', 'connected', 'error' + this.subscribedTopics = {}; + this.messageId = 1; + this.connectTimeoutTimer = null; + this.reconnectTimer = null; + this.pingIntervalTimer = null; + this.pingTimeoutTimer = null; + } + /** + * 添加事件监听器 + * @param {string} event 事件名称: 'connect', 'disconnect', 'error', 'message', 'statusChange' + * @param {Function} callback 回调函数 + */ + on(event, callback) { + if (this.eventListeners[event]) { + this.eventListeners[event].push(callback); + } + } + + /** + * 移除事件监听器 + * @param {string} event 事件名称 + * @param {Function} callback 回调函数 + */ + off(event, callback) { + if (this.eventListeners[event]) { + const index = this.eventListeners[event].indexOf(callback); + if (index > -1) { + this.eventListeners[event].splice(index, 1); + } + } + } + + /** + * 触发事件 + * @param {string} event 事件名称 + * @param {...any} args 参数 + */ + emit(event, ...args) { + if (this.eventListeners[event]) { + this.eventListeners[event].forEach(callback => { + try { + callback(...args); + } catch (err) { + console.error(`Error in ${event} event listener:`, err); + } + }); + } + } + + /** + * 更新连接状态 + * @param {string} status 新状态 + */ + setConnectionStatus(status) { + if (this.connectionStatus !== status) { + this.connectionStatus = status; + this.emit('statusChange', status); + } + } + + /** + * 连接 MQTT 服务器 + */ + connect() { + const { + host, + port, + path, + clientId, + username, + password, + protocolVersion, + clean, + cleanStart, + keepAlive, + sessionExpiryInterval, + connectTimeout + } = this.config; + + console.log("连接参数:", this.config) + if (!host) { + this.emit('error', new Error('请输入主机地址')); + return; + } + + this.setConnectionStatus('connecting'); + this.emit('statusChange', 'connecting'); + + // 构建 WebSocket URL + let urlPath = path; + if (!urlPath.startsWith('/')) { + urlPath = '/' + urlPath; + } + let url = `ws://${host}:${port}${urlPath}`; + + // 关闭现有连接 + if (this.socket) { + this.socket.close(); + this.socket = null; + } + + const ver = protocolVersion === 5 ? '5.0' : protocolVersion === 4 ? '3.1.1' : '3.1'; + // 建立 WebSocket 连接 + this.socket = uni.connectSocket({ + url, + protocols: ['mqtt', 'mqttv3.1.1'], + header: { + 'content-type': 'application/octet-stream' + }, + success: () => { + // 连接请求已发送 + }, + fail: (err) => { + this.setConnectionStatus('error'); + this.emit('error', new Error(`WebSocket 连接失败: ${JSON.stringify(err)}`)); + } + }); + + // 设置连接超时 + if (connectTimeout > 0) { + this.connectTimeoutTimer = setTimeout(() => { + if (this.connectionStatus === 'connecting') { + this.setConnectionStatus('error'); + this.emit('error', new Error(`连接超时 (${connectTimeout}ms)`)); + if (this.socket) { + this.socket.close(); + this.socket = null; + } + } + }, connectTimeout); + } + + // WebSocket 事件监听 + this.socket.onOpen(() => { + console.log('[MQTT调试] WebSocket 连接已打开'); + if (this.connectTimeoutTimer) { + clearTimeout(this.connectTimeoutTimer); + this.connectTimeoutTimer = null; + } + this.sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, + keepAlive, sessionExpiryInterval); + }); + + this.socket.onMessage((res) => { + this.handleMQTTMessage(res.data); + }); + + this.socket.onError((err) => { + if (this.connectTimeoutTimer) { + clearTimeout(this.connectTimeoutTimer); + this.connectTimeoutTimer = null; + } + this.stopPingInterval(); + this.setConnectionStatus('error'); + this.emit('error', new Error(`WebSocket 错误: ${JSON.stringify(err)}`)); + }); + + this.socket.onClose((res) => { + console.log('[MQTT调试] WebSocket 关闭,原因:', res); + if (this.connectTimeoutTimer) { + clearTimeout(this.connectTimeoutTimer); + this.connectTimeoutTimer = null; + } + this.stopPingInterval(); + + if (this.connectionStatus === 'connected' && this.config.reconnectPeriod > 0) { + this.reconnectTimer = setTimeout(() => { + this.connect(); + }, this.config.reconnectPeriod); + } + + this.setConnectionStatus('disconnected'); + this.emit('disconnect', res); + }); + } + + /** + * 断开连接 + */ + disconnect() { + if (this.connectTimeoutTimer) { + clearTimeout(this.connectTimeoutTimer); + this.connectTimeoutTimer = null; + } + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + // 清除心跳超时定时器 + if (this.pingTimeoutTimer) { + clearTimeout(this.pingTimeoutTimer); + this.pingTimeoutTimer = null; + } + this.stopPingInterval(); + + if (this.socket) { + this.sendDisconnectPacket(); + setTimeout(() => { + this.socket.close(); + this.socket = null; + this.setConnectionStatus('disconnected'); + this.subscribedTopics = {}; + }, 100); + } + } + + /** + * 发布消息 + * @param {string} topic 主题 + * @param {string} message 消息内容 + * @param {number} qos QoS 等级 (0, 1, 2) + */ + publish(topic, message, qos = 0) { + if (this.connectionStatus !== 'connected') { + this.emit('error', new Error('请先连接')); + return; + } + + if (!topic) { + this.emit('error', new Error('请输入主题')); + return; + } + + try { + this.sendPublishPacket(topic, message, qos); + this.emit('message', { + type: 'send', + topic, + message + }); + } catch (error) { + this.emit('error', new Error(`发布失败: ${error.message}`)); + } + } + + /** + * 订阅主题 + * @param {string} topic 主题 + * @param {number} qos QoS 等级 (0, 1, 2) + */ + subscribe(topic, qos = 0) { + if (this.connectionStatus !== 'connected') { + this.emit('error', new Error('请先连接')); + return; + } + + if (!topic) { + this.emit('error', new Error('请输入订阅主题')); + return; + } + + try { + this.sendSubscribePacket(topic, qos); + this.subscribedTopics[topic] = qos; + } catch (error) { + this.emit('error', new Error(`订阅失败: ${error.message}`)); + } + } + + /** + * 取消订阅 + * @param {string} topic 主题 + */ + unsubscribe(topic) { + if (this.connectionStatus !== 'connected') { + this.emit('error', new Error('请先连接')); + return; + } + + if (!topic) { + this.emit('error', new Error('请输入取消订阅的主题')); + return; + } + + try { + this.sendUnsubscribePacket(topic); + delete this.subscribedTopics[topic]; + } catch (error) { + this.emit('error', new Error(`取消订阅失败: ${error.message}`)); + } + } + + /** + * 获取当前连接状态 + * @returns {string} 连接状态 + */ + getStatus() { + return this.connectionStatus; + } + + /** + * 获取已订阅的主题列表 + * @returns {Array} 主题列表 + */ + getSubscribedTopics() { + return Object.keys(this.subscribedTopics); + } + + // ========== 内部方法 ========== + + /** + * 发送数据包 + * @param {string|ArrayBuffer|Array} packet 数据包 + */ + sendPacket(packet) { + if (!this.socket) { + return; + } + + console.log("sendPacket", packet) + let arrayBuffer; + if (typeof packet === 'string') { + const bytes = this.ab2buffer(packet); + arrayBuffer = new Uint8Array(bytes).buffer; + // 调试日志:记录发送的数据包 + console.log('MQTT 发送数据包(字符串):', packet.length, '字符,字节:', bytes.length); + console.log('数据包字节(前64字节):', bytes.slice(0, 64).map(b => b.toString(16).padStart(2, '0')).join(' ')); + } else if (packet instanceof ArrayBuffer) { + arrayBuffer = packet; + console.log('MQTT 发送数据包(ArrayBuffer):', arrayBuffer.byteLength, '字节'); + } else if (Array.isArray(packet)) { + arrayBuffer = new Uint8Array(packet).buffer; + console.log('MQTT 发送数据包(数组):', packet.length, '字节'); + } else { + this.emit('error', new Error('未知的数据包格式')); + return; + } + + uni.sendSocketMessage({ + data: arrayBuffer, + success: () => { + // 发送成功 + console.log('MQTT 数据包发送成功'); + }, + fail: (err) => { + console.error('MQTT 数据包发送失败:', err); + this.emit('error', new Error(`数据发送失败: ${JSON.stringify(err)}`)); + } + }); + } + + /** + * 处理 MQTT 消息 + * @param {*} data 原始数据 + */ + handleMQTTMessage(data) { + try { + console.log('[MQTT调试] 收到原始消息:', typeof data, data instanceof ArrayBuffer, data.byteLength || data + .length); + + const buffer = this.ab2buffer(data); + console.log('[MQTT调试] 转换后的 buffer 长度:', buffer.length); + if (buffer.length > 0) { + const hexStr = buffer.slice(0, Math.min(32, buffer.length)).map(b => b.toString(16).padStart(2, + '0')).join(' '); + console.log('[MQTT调试] buffer 前32字节(hex):', hexStr); + } + + if (buffer.length === 0) { + return; + } + + const packetType = buffer[0] & 0xF0; + const flags = buffer[0] & 0x0F; + + console.log('[MQTT调试] 数据包类型:', '0x' + packetType.toString(16).padStart(2, '0'), '标志位:', flags); + + let pos = 1; + let remainingLength = 0; + let multiplier = 1; + let encodedByte; + + do { + if (pos >= buffer.length) { + console.log('[MQTT调试] 剩余长度编码不完整'); + return; + } + encodedByte = buffer[pos]; + remainingLength += (encodedByte & 0x7F) * multiplier; + multiplier *= 128; + pos++; + } while ((encodedByte & 0x80) !== 0 && pos < buffer.length); + + console.log('[MQTT调试] 剩余长度:', remainingLength, '当前位置:', pos); + + if (packetType === 0x20) { + console.log('[MQTT调试] 处理 CONNACK 包'); + this.handleConnackPacket(buffer, pos); + } else if (packetType === 0x30 || packetType === 0x32) { + console.log('[MQTT调试] 处理 PUBLISH 包'); + this.handlePublishPacket(buffer, pos, flags); + } else if (packetType === 0x40) { + console.log('[MQTT调试] 处理 PUBACK 包'); + this.handlePubackPacket(buffer, pos); + } else if (packetType === 0x90) { + console.log('[MQTT调试] 处理 SUBACK 包'); + this.handleSubackPacket(buffer, pos); + } else if (packetType === 0xB0) { + console.log('[MQTT调试] 处理 UNSUBACK 包'); + this.handleUnsubackPacket(buffer, pos); + } else if (packetType === 0xD0) { + console.log('[MQTT调试] 处理 PINGRESP 包'); + // 清除心跳响应超时定时器 + if (this.pingTimeoutTimer) { + clearTimeout(this.pingTimeoutTimer); + this.pingTimeoutTimer = null; + console.log('[MQTT调试] 心跳响应超时定时器已清除'); + } + } else if (packetType === 0xE0) { + console.log('[MQTT调试] 处理 DISCONNECT 包'); + this.handleDisconnectPacket(buffer, pos); + } else { + console.log('[MQTT调试] 未知数据包类型:', packetType.toString(16)); + } + } catch (error) { + console.error('[MQTT调试] 处理 MQTT 消息时出错:', error); + this.emit('error', new Error(`处理 MQTT 消息时出错: ${error.message}`)); + } + } + + /** + * 处理 CONNACK 包 + */ + handleConnackPacket(buffer, pos) { + if (buffer.length < pos + 2) { + this.emit('error', new Error('CONNACK 包长度不足')); + return; + } + + const flags = buffer[pos++]; + const reasonCode = buffer[pos++]; + + const reasonCodes = { + 0: '成功', + 128: '协议版本错误', + 129: '客户端ID被拒绝', + 130: '服务器不可用', + 131: '用户名密码错误', + 132: '未授权', + 133: '服务器未找到', + 134: '服务器不可用', + 135: '用户名密码无效' + }; + + const reasonText = reasonCodes[reasonCode] || `未知错误 (${reasonCode})`; + + if (reasonCode === 0) { + console.log('[MQTT调试] CONNACK 成功,设置连接状态为 connected'); + this.setConnectionStatus('connected'); + this.emit('connect'); + this.startPingInterval(); + + // 重新订阅之前订阅的主题 + if (Object.keys(this.subscribedTopics).length > 0) { + console.log('[MQTT调试] 重新订阅之前订阅的主题'); + for (const topic in this.subscribedTopics) { + const qos = this.subscribedTopics[topic]; + try { + this.sendSubscribePacket(topic, qos); + console.log(`[MQTT调试] 重新订阅主题: ${topic}, QoS: ${qos}`); + } catch (err) { + console.error(`[MQTT调试] 重新订阅主题失败: ${topic}`, err); + } + } + } + } else { + this.setConnectionStatus('error'); + this.emit('error', new Error(`MQTT 连接失败: ${reasonText}`)); + } + } + + /** + * 处理 PUBLISH 包 + */ + handlePublishPacket(buffer, pos, flags) { + try { + console.log('[MQTT调试] 开始处理 PUBLISH 包,buffer 长度:', buffer.length, '起始位置:', pos, '标志位:', flags); + + const qos = (flags >> 1) & 0x03; + console.log('[MQTT调试] 解析出的 QoS:', qos); + + if (buffer.length < pos + 2) { + console.error('[MQTT调试] PUBLISH 包主题长度字段不完整'); + this.emit('error', new Error('PUBLISH 包主题长度字段不完整')); + return; + } + const topicLen = buffer[pos++] * 256 + buffer[pos++]; + console.log('[MQTT调试] 主题长度:', topicLen); + + if (buffer.length < pos + topicLen) { + console.error('[MQTT调试] PUBLISH 包主题数据不完整'); + this.emit('error', new Error('PUBLISH 包主题数据不完整')); + return; + } + const topicBytes = buffer.slice(pos, pos + topicLen); + const topicHex = topicBytes.slice(0, Math.min(32, topicBytes.length)).map(b => b.toString(16).padStart( + 2, '0')).join(' '); + console.log('[MQTT调试] 主题原始字节(hex):', topicHex); + const topic = this.bytesToString(topicBytes); + console.log('[MQTT调试] 解析出的主题:', topic); + pos += topicLen; + + let packetId = 0; + if (qos > 0) { + if (buffer.length < pos + 2) { + console.error('[MQTT调试] PUBLISH 包包ID字段不完整'); + this.emit('error', new Error('PUBLISH 包包ID字段不完整')); + return; + } + packetId = buffer[pos++] * 256 + buffer[pos++]; + console.log('[MQTT调试] 报文标识符:', packetId); + } + + const payloadBytes = buffer.slice(pos); + console.log('[MQTT调试] 载荷字节长度:', payloadBytes.length); + if (payloadBytes.length > 0) { + const payloadHex = payloadBytes.slice(0, Math.min(64, payloadBytes.length)).map(b => b.toString(16) + .padStart(2, '0')).join(' '); + console.log('[MQTT调试] 载荷前64字节(hex):', payloadHex); + } + const payload = this.bytesToString(payloadBytes); + console.log('[MQTT调试] 解析出的载荷:', payload); + + this.emit('message', { + type: 'receive', + topic, + payload + }); + + if (qos === 1) { + this.sendPubackPacket(packetId); + } + } catch (error) { + console.error('[MQTT调试] 处理 PUBLISH 包时出错:', error); + this.emit('error', new Error(`处理 PUBLISH 包时出错: ${error.message}`)); + } + } + + /** + * 处理 PUBACK 包 + */ + handlePubackPacket(buffer, pos) { + if (buffer.length >= pos + 2) { + const packetId = buffer[pos++] * 256 + buffer[pos++]; + // 可以触发事件 + } + } + + /** + * 处理 SUBACK 包 + */ + handleSubackPacket(buffer, pos) { + if (buffer.length >= pos + 2) { + const packetId = buffer[pos++] * 256 + buffer[pos++]; + this.emit('subscribed', packetId); + } + } + + /** + * 处理 UNSUBACK 包 + */ + handleUnsubackPacket(buffer, pos) { + if (buffer.length >= pos + 2) { + const packetId = buffer[pos++] * 256 + buffer[pos++]; + this.emit('unsubscribed', packetId); + } + } + + /** + * 处理 DISCONNECT 包 + */ + handleDisconnectPacket(buffer, pos) { + this.setConnectionStatus('disconnected'); + this.emit('disconnect', { + type: 'server' + }); + } + + /** + * 发送 CONNECT 包 + */ + sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, + sessionExpiryInterval) { + try { + console.log('[MQTT调试] 发送 CONNECT 包,clean:', clean, 'cleanStart:', cleanStart, 'protocolVersion:', + protocolVersion); + let packet = ''; + packet += String.fromCharCode(0x10); + + let variableHeader = ''; + variableHeader += String.fromCharCode(0x00, 0x04); + variableHeader += 'MQTT'; + variableHeader += String.fromCharCode(protocolVersion); + + let flags = 0x00; + if (protocolVersion === 5) { + if (cleanStart) flags |= 0x02; + } else { + if (clean) flags |= 0x02; + } + if (username) flags |= 0x80; + if (password) flags |= 0x40; + variableHeader += String.fromCharCode(flags); + + variableHeader += String.fromCharCode((keepAlive >> 8) & 0xFF, keepAlive & 0xFF); + + // MQTT 5.0 属性处理 + if (protocolVersion === 5) { + let properties = ''; + let propertyLength = 0; + + // 添加会话过期间隔属性 (0x11) + if (!cleanStart && sessionExpiryInterval !== undefined) { + properties += String.fromCharCode(0x11); // 属性标识符 + properties += String.fromCharCode((sessionExpiryInterval >> 24) & 0xFF); + properties += String.fromCharCode((sessionExpiryInterval >> 16) & 0xFF); + properties += String.fromCharCode((sessionExpiryInterval >> 8) & 0xFF); + properties += String.fromCharCode(sessionExpiryInterval & 0xFF); + propertyLength += 1 + 4; // 标识符 + 4字节值 + } + + // 编码属性长度为可变字节整数 + let encodedPropertyLength = ''; + let propLen = propertyLength; + do { + let byte = propLen % 128; + propLen = Math.floor(propLen / 128); + if (propLen > 0) { + byte |= 0x80; + } + encodedPropertyLength += String.fromCharCode(byte); + } while (propLen > 0); + + variableHeader += encodedPropertyLength; + variableHeader += properties; + } + + let payload = this.encodeString(clientId); + if (username) payload += this.encodeString(username); + if (password) payload += this.encodeString(password); + + const remainingLength = variableHeader.length + payload.length; + packet += this.encodeRemainingLength(remainingLength); + packet += variableHeader; + packet += payload; + + this.sendPacket(packet); + } catch (error) { + this.emit('error', new Error(`发送 CONNECT 包失败: ${error.message}`)); + } + } + + /** + * 发送 DISCONNECT 包 + */ + sendDisconnectPacket() { + let packet = String.fromCharCode(0xE0); + packet += String.fromCharCode(0x00); + this.sendPacket(packet); + } + + /** + * 发送 PUBLISH 包 + */ + sendPublishPacket(topic, message, qos) { + let packet = ''; + + const flags = 0x30 | (qos << 1); + packet += String.fromCharCode(flags); + + const topicBytes = this.encodeString(topic); + const msgBytes = this.stringToBytes(message); + + let remainingLength = topicBytes.length + msgBytes.length; + + if (qos > 0) { + remainingLength += 2; + packet += this.encodeRemainingLength(remainingLength); + packet += topicBytes; + packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); + this.messageId++; + } else { + packet += this.encodeRemainingLength(remainingLength); + packet += topicBytes; + } + + packet += msgBytes; + this.sendPacket(packet); + } + + /** + * 发送 SUBSCRIBE 包 + */ + sendSubscribePacket(topic, qos) { + let packet = String.fromCharCode(0x82); + + const topicBytes = this.encodeString(topic); + let remainingLength = 2 + topicBytes.length + 1; // packetId + topicBytes + qos + + if (this.config.protocolVersion === 5) { + remainingLength += 1; // property length byte (0x00) + } + + packet += this.encodeRemainingLength(remainingLength); + packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); + + if (this.config.protocolVersion === 5) { + packet += String.fromCharCode(0x00); // property length = 0 + } + + packet += topicBytes; + packet += String.fromCharCode(qos); + + this.messageId++; + this.sendPacket(packet); + } + + /** + * 发送 UNSUBSCRIBE 包 + */ + sendUnsubscribePacket(topic) { + let packet = String.fromCharCode(0xA2); + + const topicBytes = this.encodeString(topic); + let remainingLength = 2 + topicBytes.length; // packetId + topicBytes + + if (this.config.protocolVersion === 5) { + remainingLength += 1; // property length byte (0x00) + } + + packet += this.encodeRemainingLength(remainingLength); + packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); + + if (this.config.protocolVersion === 5) { + packet += String.fromCharCode(0x00); // property length = 0 + } + + packet += topicBytes; + + this.messageId++; + this.sendPacket(packet); + } + + /** + * 发送 PUBACK 包 + */ + sendPubackPacket(packetId) { + try { + let packet = String.fromCharCode(0x40); + packet += String.fromCharCode(0x02); + packet += String.fromCharCode((packetId >> 8) & 0xFF, packetId & 0xFF); + this.sendPacket(packet); + } catch (error) { + this.emit('error', new Error(`发送 PUBACK 失败: ${error.message}`)); + } + } + + /** + * 发送 PINGREQ 包 + */ + sendPingPacket() { + try { + // 清除之前的超时定时器 + if (this.pingTimeoutTimer) { + clearTimeout(this.pingTimeoutTimer); + this.pingTimeoutTimer = null; + } + + console.log('[MQTT调试] 发送 PINGREQ 包'); + let packet = String.fromCharCode(0xC0); + packet += String.fromCharCode(0x00); + this.sendPacket(packet); + + // 设置心跳响应超时定时器 + const { + keepAlive + } = this.config; + if (keepAlive > 0) { + const timeout = keepAlive * 1000 * 1.5; // 1.5倍keepAlive时间 + console.log('[MQTT调试] 设置心跳响应超时:', timeout, 'ms'); + this.pingTimeoutTimer = setTimeout(() => { + console.log('[MQTT调试] 心跳响应超时,服务器未回复 PINGRESP'); + this.emit('error', new Error('心跳响应超时,服务器未回复 PINGRESP')); + // 主动断开连接 + this.disconnect(); + }, timeout); + } + } catch (error) { + this.emit('error', new Error(`发送 PINGREQ 失败: ${error.message}`)); + } + } + + /** + * 启动心跳定时器 + */ + startPingInterval() { + this.stopPingInterval(); + const { + keepAlive + } = this.config; + console.log('[MQTT调试] 启动心跳定时器,keepAlive:', keepAlive); + if (keepAlive > 0) { + const interval = (keepAlive * 1000) * 0.8; + console.log('[MQTT调试] 心跳间隔:', interval, 'ms'); + this.pingIntervalTimer = setInterval(() => { + if (this.connectionStatus === 'connected') { + console.log('[MQTT调试] 发送心跳 PINGREQ'); + this.sendPingPacket(); + } + }, interval); + } + } + + /** + * 停止心跳定时器 + */ + stopPingInterval() { + if (this.pingIntervalTimer) { + console.log('[MQTT调试] 停止心跳定时器'); + clearInterval(this.pingIntervalTimer); + this.pingIntervalTimer = null; + } + // 清除心跳响应超时定时器 + if (this.pingTimeoutTimer) { + clearTimeout(this.pingTimeoutTimer); + this.pingTimeoutTimer = null; + console.log('[MQTT调试] 心跳响应超时定时器已清除'); + } + } + + // ========== 工具方法 ========== + + /** + * 将各种数据格式转换为字节数组 + */ + ab2buffer(data) { + try { + if (typeof data === 'string') { + const bytes = []; + for (let i = 0; i < data.length; i++) { + bytes.push(data.charCodeAt(i) & 0xFF); + } + return bytes; + } else if (data instanceof ArrayBuffer) { + return Array.from(new Uint8Array(data)); + } else if (Array.isArray(data)) { + return data; + } else { + return Array.from(data); + } + } catch (error) { + this.emit('error', new Error(`数据转换失败: ${error.message}`)); + return []; + } + } + + /** + * 字节数组转字符串(UTF-8 解码) + */ + bytesToString(bytes) { + try { + if (!bytes || bytes.length === 0) return ''; + + // 优先使用 TextDecoder 进行 UTF-8 解码 + if (typeof TextDecoder !== 'undefined') { + try { + const decoder = new TextDecoder('utf-8'); + const array = new Uint8Array(bytes); + return decoder.decode(array); + } catch (decodeError) { + console.warn('TextDecoder 解码失败,回退到手动解码:', decodeError); + } + } + + // 手动 UTF-8 解码 + let result = ''; + let i = 0; + while (i < bytes.length) { + const byte = bytes[i]; + + // ASCII 字符 (0-127) + if (byte < 128) { + result += String.fromCharCode(byte); + i++; + } + // 2字节 UTF-8 字符 + else if ((byte & 0xE0) === 0xC0 && i + 1 < bytes.length) { + const byte2 = bytes[i + 1]; + if ((byte2 & 0xC0) === 0x80) { + const codePoint = ((byte & 0x1F) << 6) | (byte2 & 0x3F); + result += String.fromCharCode(codePoint); + i += 2; + } else { + // 无效的 UTF-8 序列,跳过 + i++; + } + } + // 3字节 UTF-8 字符 + else if ((byte & 0xF0) === 0xE0 && i + 2 < bytes.length) { + const byte2 = bytes[i + 1]; + const byte3 = bytes[i + 2]; + if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80) { + const codePoint = ((byte & 0x0F) << 12) | ((byte2 & 0x3F) << 6) | (byte3 & 0x3F); + result += String.fromCharCode(codePoint); + i += 3; + } else { + // 无效的 UTF-8 序列,跳过 + i++; + } + } + // 4字节 UTF-8 字符(代理对) + else if ((byte & 0xF8) === 0xF0 && i + 3 < bytes.length) { + const byte2 = bytes[i + 1]; + const byte3 = bytes[i + 2]; + const byte4 = bytes[i + 3]; + if ((byte2 & 0xC0) === 0x80 && (byte3 & 0xC0) === 0x80 && (byte4 & 0xC0) === 0x80) { + const codePoint = ((byte & 0x07) << 18) | ((byte2 & 0x3F) << 12) | ((byte3 & 0x3F) << 6) | ( + byte4 & 0x3F); + // 代理对处理 + if (codePoint <= 0xFFFF) { + result += String.fromCharCode(codePoint); + } else { + // 转换为代理对 + const highSurrogate = 0xD800 + (((codePoint - 0x10000) >> 10) & 0x3FF); + const lowSurrogate = 0xDC00 + ((codePoint - 0x10000) & 0x3FF); + result += String.fromCharCode(highSurrogate, lowSurrogate); + } + i += 4; + } else { + // 无效的 UTF-8 序列,跳过 + i++; + } + } else { + // 无效的 UTF-8 序列,跳过 + i++; + } + } + return result; + } catch (error) { + console.error('字节转字符串失败:', error); + this.emit('error', new Error(`字节转字符串失败: ${error.message}`)); + return ''; + } + } + + /** + * 编码字符串(添加长度前缀) + */ + encodeString(str) { + let result = ''; + const bytes = this.stringToBytes(str); + result += String.fromCharCode((bytes.length >> 8) & 0xFF); + result += String.fromCharCode(bytes.length & 0xFF); + for (let i = 0; i < bytes.length; i++) { + result += String.fromCharCode(bytes[i]); + } + return result; + } + + /** + * 字符串转字节数组(UTF-8 编码) + */ + stringToBytes(str) { + const bytes = []; + for (let i = 0; i < str.length; i++) { + let code = str.charCodeAt(i); + + if (code <= 0x7F) { + // 1字节 UTF-8 + bytes.push(code); + } else if (code <= 0x7FF) { + // 2字节 UTF-8 + bytes.push(0xC0 | (code >> 6)); + bytes.push(0x80 | (code & 0x3F)); + } else if (code >= 0xD800 && code <= 0xDFFF) { + // 代理对 - 4字节 UTF-8 + i++; + if (i < str.length) { + const high = code; + const low = str.charCodeAt(i); + const fullCode = ((high & 0x3FF) << 10) + (low & 0x3FF) + 0x10000; + bytes.push(0xF0 | (fullCode >> 18)); + bytes.push(0x80 | ((fullCode >> 12) & 0x3F)); + bytes.push(0x80 | ((fullCode >> 6) & 0x3F)); + bytes.push(0x80 | (fullCode & 0x3F)); + } + } else { + // 3字节 UTF-8 + bytes.push(0xE0 | (code >> 12)); + bytes.push(0x80 | ((code >> 6) & 0x3F)); + bytes.push(0x80 | (code & 0x3F)); + } + } + return bytes; + } + + /** + * 编码剩余长度 + */ + encodeRemainingLength(length) { + let result = ''; + do { + let byte = length % 128; + length = Math.floor(length / 128); + if (length > 0) { + byte |= 0x80; + } + result += String.fromCharCode(byte); + } while (length > 0); + return result; + } +} + +// 导出类 +export default MQTTClient; \ No newline at end of file diff --git a/uni_modules/yh-wsmqtt/package.json b/uni_modules/yh-wsmqtt/package.json deleted file mode 100644 index 974a84d..0000000 --- a/uni_modules/yh-wsmqtt/package.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "id": "yh-wsmqtt", - "name": "websocket MQTT 通讯插件,纯websocket 实现", - "displayName": "websocket MQTT 通讯插件,纯websocket 实现", - "version": "1.0.0", - "description": "websocket MQTT 通讯插件,纯websocket 实现", - "keywords": [ - "websocket;MQTT;uni" - ], - "dcloudext": { - "type": "js_sdk", - "sale": { - "regular": { - "price": "0.00" - }, - "sourcecode": { - "price": "0.00" - } - } - }, - "uni_modules": { - "dependencies": [], - "encrypt": [] - } -} \ No newline at end of file diff --git a/uni_modules/yh-wsmqtt/wsmqtt/README.md b/uni_modules/yh-wsmqtt/wsmqtt/README.md deleted file mode 100644 index a2fc8f0..0000000 --- a/uni_modules/yh-wsmqtt/wsmqtt/README.md +++ /dev/null @@ -1,173 +0,0 @@ -# MQTT WebSocket 客户端插件 - -基于 WebSocket 的 MQTT 客户端插件,支持 uni-app 多端运行。 - -## 功能特性 - -- ✅ 支持 MQTT 3.1.1 和 MQTT 5.0 协议 -- ✅ 基于 uni-app 的 WebSocket API 实现 -- ✅ 自动心跳保活机制 -- ✅ 支持自动重连 -- ✅ 提供 JavaScript SDK 和 Vue 组件两种使用方式 -- ✅ 支持订阅/发布消息 -- ✅ 完整的连接状态管理 -- ✅ 多端兼容:App、小程序、H5 - -## 安装 - -1. 将本插件文件夹 `wsmqtt` 复制到项目的 `uni_modules` 目录下 -2. 在需要使用的页面或组件中引入 - -## 使用方式 - -### 方式一:JavaScript SDK(推荐) - -```javascript -import MQTTClient from '@/uni_modules/wsmqtt/js_sdk/index.js' - -// 创建客户端实例 -const mqttClient = new MQTTClient({ - host: 'broker.emqx.io', - port: 8083, - path: '/mqtt', - clientId: 'client_' + Date.now(), - username: '', - password: '', - protocolVersion: 4, // 4 for MQTT 3.1.1, 5 for MQTT 5.0 - keepAlive: 60, - connectTimeout: 10000, - reconnectPeriod: 5000 -}) - -// 监听连接事件 -mqttClient.on('connect', () => { - console.log('连接成功') -}) - -// 监听消息事件 -mqttClient.on('message', (msg) => { - console.log('收到消息:', msg) -}) - -// 连接服务器 -mqttClient.connect() - -// 发布消息 -mqttClient.publish('test/topic', 'Hello MQTT', 0) - -// 订阅主题 -mqttClient.subscribe('test/topic', 0) - -// 取消订阅 -mqttClient.unsubscribe('test/topic') - -// 断开连接 -mqttClient.disconnect() -``` - -### 方式二:Vue 组件 - -```vue - - - -``` - -## API 文档 - -### MQTTClient 类 - -#### 构造函数 -```javascript -new MQTTClient(options) -``` - -#### 配置选项 -| 参数 | 类型 | 默认值 | 说明 | -|------|------|--------|------| -| host | string | '' | MQTT 服务器地址 | -| port | number | 8083 | 端口号 | -| path | string | '/mqtt' | WebSocket 路径 | -| clientId | string | 'mqttx_' + Date.now() | 客户端 ID | -| username | string | '' | 用户名 | -| password | string | '' | 密码 | -| protocolVersion | number | 4 | 协议版本:3(MQTT 3.1), 4(MQTT 3.1.1), 5(MQTT 5.0) | -| clean | boolean | true | MQTT 3.1.1 Clean Session | -| cleanStart | boolean | false | MQTT 5.0 Clean Start | -| keepAlive | number | 60 | 心跳间隔(秒) | -| sessionExpiryInterval | number | 4294967295 | 会话过期时间 | -| connectTimeout | number | 10000 | 连接超时时间(毫秒) | -| reconnectPeriod | number | 0 | 重连间隔(毫秒),0 表示不自动重连 | - -#### 方法 -- `connect()`: 连接服务器 -- `disconnect()`: 断开连接 -- `publish(topic, message, qos)`: 发布消息 -- `subscribe(topic, qos)`: 订阅主题 -- `unsubscribe(topic)`: 取消订阅 -- `on(event, callback)`: 添加事件监听 -- `off(event, callback)`: 移除事件监听 -- `getStatus()`: 获取当前连接状态 -- `getSubscribedTopics()`: 获取已订阅的主题列表 - -#### 事件 -- `connect`: 连接成功 -- `disconnect`: 连接断开 -- `error`: 发生错误 -- `message`: 收到或发送消息 -- `statusChange`: 连接状态变化 -- `subscribed`: 订阅成功 -- `unsubscribed`: 取消订阅成功 - -## 示例 - -本项目包含完整的示例页面,位于 `pages/mqtt-demo/index.vue`,展示了插件的所有功能。 - -## 注意事项 - -1. 小程序端需要使用真机调试,部分模拟器可能不支持 WebSocket -2. 确保服务器支持 WebSocket 协议的 MQTT -3. 不同平台的 WebSocket 实现可能有差异,建议在各平台测试 - -## 更新日志 - -详见 [CHANGELOG.md](./changelog.md) - -## 许可证 - -MIT License \ No newline at end of file diff --git a/uni_modules/yh-wsmqtt/wsmqtt/changelog.md b/uni_modules/yh-wsmqtt/wsmqtt/changelog.md deleted file mode 100644 index 6e3ec0d..0000000 --- a/uni_modules/yh-wsmqtt/wsmqtt/changelog.md +++ /dev/null @@ -1,11 +0,0 @@ -# 更新日志 - -## v1.0.0 (2026-01-15) - -### 新增 -- 初始版本发布 -- 基于 WebSocket 的 MQTT 客户端实现 -- 支持 MQTT 3.1.1 和 MQTT 5.0 协议 -- 提供完整的连接、发布、订阅、取消订阅功能 -- 内置心跳机制和自动重连 -- 提供 Vue 组件封装 \ No newline at end of file diff --git a/uni_modules/yh-wsmqtt/wsmqtt/components/mqtt-client.vue b/uni_modules/yh-wsmqtt/wsmqtt/components/mqtt-client.vue deleted file mode 100644 index e53079d..0000000 --- a/uni_modules/yh-wsmqtt/wsmqtt/components/mqtt-client.vue +++ /dev/null @@ -1,95 +0,0 @@ - - - - - diff --git a/uni_modules/yh-wsmqtt/wsmqtt/js_sdk/index.js b/uni_modules/yh-wsmqtt/wsmqtt/js_sdk/index.js deleted file mode 100644 index babca23..0000000 --- a/uni_modules/yh-wsmqtt/wsmqtt/js_sdk/index.js +++ /dev/null @@ -1,792 +0,0 @@ -/** - * MQTT 客户端插件 for uni-app - * 基于 WebSocket 实现 MQTT 协议 - */ -class MQTTClient { - /** - * 构造函数 - * @param {Object} options 配置选项 - */ - constructor(options = {}) { - // 默认配置 - this.config = { - host: '', - port: 8083, - path: '/mqtt', - clientId: 'mqttx_' + Date.now(), - username: '', - password: '', - protocolVersion: 4, // 4 for MQTT 3.1.1 - clean: true, - cleanStart: false, // for MQTT 5.0 - keepAlive: 60, - sessionExpiryInterval: 4294967295, - connectTimeout: 10000, - reconnectPeriod: 0 - }; - - // 合并用户配置 - Object.assign(this.config, options); - - // 状态 - this.socket = null; - this.connectionStatus = 'disconnected'; // 'disconnected', 'connecting', 'connected', 'error' - this.subscribedTopics = []; - this.messageId = 1; - this.connectTimeoutTimer = null; - this.reconnectTimer = null; - this.pingIntervalTimer = null; - - // 事件监听器 - this.eventListeners = { - connect: [], - disconnect: [], - error: [], - message: [], - statusChange: [] - }; - - // 绑定方法 - this.on = this.on.bind(this); - this.off = this.off.bind(this); - this.connect = this.connect.bind(this); - this.disconnect = this.disconnect.bind(this); - this.publish = this.publish.bind(this); - this.subscribe = this.subscribe.bind(this); - this.unsubscribe = this.unsubscribe.bind(this); - } - - /** - * 添加事件监听器 - * @param {string} event 事件名称: 'connect', 'disconnect', 'error', 'message', 'statusChange' - * @param {Function} callback 回调函数 - */ - on(event, callback) { - if (this.eventListeners[event]) { - this.eventListeners[event].push(callback); - } - } - - /** - * 移除事件监听器 - * @param {string} event 事件名称 - * @param {Function} callback 回调函数 - */ - off(event, callback) { - if (this.eventListeners[event]) { - const index = this.eventListeners[event].indexOf(callback); - if (index > -1) { - this.eventListeners[event].splice(index, 1); - } - } - } - - /** - * 触发事件 - * @param {string} event 事件名称 - * @param {...any} args 参数 - */ - emit(event, ...args) { - if (this.eventListeners[event]) { - this.eventListeners[event].forEach(callback => { - try { - callback(...args); - } catch (err) { - console.error(`Error in ${event} event listener:`, err); - } - }); - } - } - - /** - * 更新连接状态 - * @param {string} status 新状态 - */ - setConnectionStatus(status) { - if (this.connectionStatus !== status) { - this.connectionStatus = status; - this.emit('statusChange', status); - } - } - - /** - * 连接 MQTT 服务器 - */ - connect() { - const { host, port, path, clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval, connectTimeout } = this.config; - - if (!host) { - this.emit('error', new Error('请输入主机地址')); - return; - } - - this.setConnectionStatus('connecting'); - this.emit('statusChange', 'connecting'); - - // 构建 WebSocket URL - let urlPath = path; - if (!urlPath.startsWith('/')) { - urlPath = '/' + urlPath; - } - let url = `ws://${host}:${port}${urlPath}`; - - // 关闭现有连接 - if (this.socket) { - this.socket.close(); - this.socket = null; - } - - // 建立 WebSocket 连接 - this.socket = uni.connectSocket({ - url, - protocols: ['mqttv3.1.1', 'mqtt'], - header: { - 'content-type': 'application/octet-stream' - }, - success: () => { - // 连接请求已发送 - }, - fail: (err) => { - this.setConnectionStatus('error'); - this.emit('error', new Error(`WebSocket 连接失败: ${JSON.stringify(err)}`)); - } - }); - - // 设置连接超时 - if (connectTimeout > 0) { - this.connectTimeoutTimer = setTimeout(() => { - if (this.connectionStatus === 'connecting') { - this.setConnectionStatus('error'); - this.emit('error', new Error(`连接超时 (${connectTimeout}ms)`)); - if (this.socket) { - this.socket.close(); - this.socket = null; - } - } - }, connectTimeout); - } - - // WebSocket 事件监听 - this.socket.onOpen(() => { - if (this.connectTimeoutTimer) { - clearTimeout(this.connectTimeoutTimer); - this.connectTimeoutTimer = null; - } - this.sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval); - }); - - this.socket.onMessage((res) => { - this.handleMQTTMessage(res.data); - }); - - this.socket.onError((err) => { - if (this.connectTimeoutTimer) { - clearTimeout(this.connectTimeoutTimer); - this.connectTimeoutTimer = null; - } - this.stopPingInterval(); - this.setConnectionStatus('error'); - this.emit('error', new Error(`WebSocket 错误: ${JSON.stringify(err)}`)); - }); - - this.socket.onClose((res) => { - if (this.connectTimeoutTimer) { - clearTimeout(this.connectTimeoutTimer); - this.connectTimeoutTimer = null; - } - this.stopPingInterval(); - - if (this.connectionStatus === 'connected' && this.config.reconnectPeriod > 0) { - this.reconnectTimer = setTimeout(() => { - this.connect(); - }, this.config.reconnectPeriod); - } - - this.setConnectionStatus('disconnected'); - this.emit('disconnect', res); - }); - } - - /** - * 断开连接 - */ - disconnect() { - if (this.connectTimeoutTimer) { - clearTimeout(this.connectTimeoutTimer); - this.connectTimeoutTimer = null; - } - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - this.stopPingInterval(); - - if (this.socket) { - this.sendDisconnectPacket(); - setTimeout(() => { - this.socket.close(); - this.socket = null; - this.setConnectionStatus('disconnected'); - this.subscribedTopics = []; - }, 100); - } - } - - /** - * 发布消息 - * @param {string} topic 主题 - * @param {string} message 消息内容 - * @param {number} qos QoS 等级 (0, 1, 2) - */ - publish(topic, message, qos = 0) { - if (this.connectionStatus !== 'connected') { - this.emit('error', new Error('请先连接')); - return; - } - - if (!topic) { - this.emit('error', new Error('请输入主题')); - return; - } - - try { - this.sendPublishPacket(topic, message, qos); - this.emit('message', { type: 'send', topic, message }); - } catch (error) { - this.emit('error', new Error(`发布失败: ${error.message}`)); - } - } - - /** - * 订阅主题 - * @param {string} topic 主题 - * @param {number} qos QoS 等级 (0, 1, 2) - */ - subscribe(topic, qos = 0) { - if (this.connectionStatus !== 'connected') { - this.emit('error', new Error('请先连接')); - return; - } - - if (!topic) { - this.emit('error', new Error('请输入订阅主题')); - return; - } - - try { - this.sendSubscribePacket(topic, qos); - if (!this.subscribedTopics.includes(topic)) { - this.subscribedTopics.push(topic); - } - } catch (error) { - this.emit('error', new Error(`订阅失败: ${error.message}`)); - } - } - - /** - * 取消订阅 - * @param {string} topic 主题 - */ - unsubscribe(topic) { - if (this.connectionStatus !== 'connected') { - this.emit('error', new Error('请先连接')); - return; - } - - if (!topic) { - this.emit('error', new Error('请输入取消订阅的主题')); - return; - } - - try { - this.sendUnsubscribePacket(topic); - const index = this.subscribedTopics.indexOf(topic); - if (index > -1) { - this.subscribedTopics.splice(index, 1); - } - } catch (error) { - this.emit('error', new Error(`取消订阅失败: ${error.message}`)); - } - } - - /** - * 获取当前连接状态 - * @returns {string} 连接状态 - */ - getStatus() { - return this.connectionStatus; - } - - /** - * 获取已订阅的主题列表 - * @returns {Array} 主题列表 - */ - getSubscribedTopics() { - return [...this.subscribedTopics]; - } - - // ========== 内部方法 ========== - - /** - * 发送数据包 - * @param {string|ArrayBuffer|Array} packet 数据包 - */ - sendPacket(packet) { - if (!this.socket) { - return; - } - - let arrayBuffer; - if (typeof packet === 'string') { - const bytes = this.ab2buffer(packet); - arrayBuffer = new Uint8Array(bytes).buffer; - } else if (packet instanceof ArrayBuffer) { - arrayBuffer = packet; - } else if (Array.isArray(packet)) { - arrayBuffer = new Uint8Array(packet).buffer; - } else { - this.emit('error', new Error('未知的数据包格式')); - return; - } - - uni.sendSocketMessage({ - data: arrayBuffer, - success: () => { - // 发送成功 - }, - fail: (err) => { - this.emit('error', new Error(`数据发送失败: ${JSON.stringify(err)}`)); - } - }); - } - - /** - * 处理 MQTT 消息 - * @param {*} data 原始数据 - */ - handleMQTTMessage(data) { - try { - const buffer = this.ab2buffer(data); - - if (buffer.length === 0) { - return; - } - - const packetType = buffer[0] & 0xF0; - const flags = buffer[0] & 0x0F; - - let pos = 1; - let remainingLength = 0; - let multiplier = 1; - let encodedByte; - - do { - if (pos >= buffer.length) { - return; - } - encodedByte = buffer[pos]; - remainingLength += (encodedByte & 0x7F) * multiplier; - multiplier *= 128; - pos++; - } while ((encodedByte & 0x80) !== 0 && pos < buffer.length); - - if (packetType === 0x20) { - this.handleConnackPacket(buffer, pos); - } else if (packetType === 0x30 || packetType === 0x32) { - this.handlePublishPacket(buffer, pos, flags); - } else if (packetType === 0x40) { - this.handlePubackPacket(buffer, pos); - } else if (packetType === 0x90) { - this.handleSubackPacket(buffer, pos); - } else if (packetType === 0xB0) { - this.handleUnsubackPacket(buffer, pos); - } else if (packetType === 0xD0) { - // 心跳响应 - } else if (packetType === 0xE0) { - this.handleDisconnectPacket(buffer, pos); - } - } catch (error) { - this.emit('error', new Error(`处理 MQTT 消息时出错: ${error.message}`)); - } - } - - /** - * 处理 CONNACK 包 - */ - handleConnackPacket(buffer, pos) { - if (buffer.length < pos + 2) { - this.emit('error', new Error('CONNACK 包长度不足')); - return; - } - - const flags = buffer[pos++]; - const reasonCode = buffer[pos++]; - - const reasonCodes = { - 0: '成功', - 128: '协议版本错误', - 129: '客户端ID被拒绝', - 130: '服务器不可用', - 131: '用户名密码错误', - 132: '未授权', - 133: '服务器未找到', - 134: '服务器不可用', - 135: '用户名密码无效' - }; - - const reasonText = reasonCodes[reasonCode] || `未知错误 (${reasonCode})`; - - if (reasonCode === 0) { - this.setConnectionStatus('connected'); - this.emit('connect'); - this.startPingInterval(); - } else { - this.setConnectionStatus('error'); - this.emit('error', new Error(`MQTT 连接失败: ${reasonText}`)); - } - } - - /** - * 处理 PUBLISH 包 - */ - handlePublishPacket(buffer, pos, flags) { - try { - const qos = (flags >> 1) & 0x03; - - if (buffer.length < pos + 2) { - this.emit('error', new Error('PUBLISH 包主题长度字段不完整')); - return; - } - const topicLen = buffer[pos++] * 256 + buffer[pos++]; - - if (buffer.length < pos + topicLen) { - this.emit('error', new Error('PUBLISH 包主题数据不完整')); - return; - } - const topicBytes = buffer.slice(pos, pos + topicLen); - const topic = this.bytesToString(topicBytes); - pos += topicLen; - - let packetId = 0; - if (qos > 0) { - if (buffer.length < pos + 2) { - this.emit('error', new Error('PUBLISH 包包ID字段不完整')); - return; - } - packetId = buffer[pos++] * 256 + buffer[pos++]; - } - - const payloadBytes = buffer.slice(pos); - const payload = this.bytesToString(payloadBytes); - - this.emit('message', { type: 'receive', topic, payload }); - - if (qos === 1) { - this.sendPubackPacket(packetId); - } - } catch (error) { - this.emit('error', new Error(`处理 PUBLISH 包时出错: ${error.message}`)); - } - } - - /** - * 处理 PUBACK 包 - */ - handlePubackPacket(buffer, pos) { - if (buffer.length >= pos + 2) { - const packetId = buffer[pos++] * 256 + buffer[pos++]; - // 可以触发事件 - } - } - - /** - * 处理 SUBACK 包 - */ - handleSubackPacket(buffer, pos) { - if (buffer.length >= pos + 2) { - const packetId = buffer[pos++] * 256 + buffer[pos++]; - this.emit('subscribed', packetId); - } - } - - /** - * 处理 UNSUBACK 包 - */ - handleUnsubackPacket(buffer, pos) { - if (buffer.length >= pos + 2) { - const packetId = buffer[pos++] * 256 + buffer[pos++]; - this.emit('unsubscribed', packetId); - } - } - - /** - * 处理 DISCONNECT 包 - */ - handleDisconnectPacket(buffer, pos) { - this.setConnectionStatus('disconnected'); - this.emit('disconnect', { type: 'server' }); - } - - /** - * 发送 CONNECT 包 - */ - sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval) { - try { - let packet = ''; - packet += String.fromCharCode(0x10); - - let variableHeader = ''; - variableHeader += String.fromCharCode(0x00, 0x04); - variableHeader += 'MQTT'; - variableHeader += String.fromCharCode(protocolVersion); - - let flags = 0x00; - if (protocolVersion === 5) { - if (cleanStart) flags |= 0x02; - } else { - if (clean) flags |= 0x02; - } - if (username) flags |= 0x80; - if (password) flags |= 0x40; - variableHeader += String.fromCharCode(flags); - - variableHeader += String.fromCharCode((keepAlive >> 8) & 0xFF, keepAlive & 0xFF); - - let payload = this.encodeString(clientId); - if (username) payload += this.encodeString(username); - if (password) payload += this.encodeString(password); - - const remainingLength = variableHeader.length + payload.length; - packet += this.encodeRemainingLength(remainingLength); - packet += variableHeader; - packet += payload; - - this.sendPacket(packet); - } catch (error) { - this.emit('error', new Error(`发送 CONNECT 包失败: ${error.message}`)); - } - } - - /** - * 发送 DISCONNECT 包 - */ - sendDisconnectPacket() { - let packet = String.fromCharCode(0xE0); - packet += String.fromCharCode(0x00); - this.sendPacket(packet); - } - - /** - * 发送 PUBLISH 包 - */ - sendPublishPacket(topic, message, qos) { - let packet = ''; - - const flags = 0x30 | (qos << 1); - packet += String.fromCharCode(flags); - - const topicBytes = this.encodeString(topic); - const msgBytes = this.stringToBytes(message); - - let remainingLength = topicBytes.length + msgBytes.length; - - if (qos > 0) { - remainingLength += 2; - packet += this.encodeRemainingLength(remainingLength); - packet += topicBytes; - packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); - this.messageId++; - } else { - packet += this.encodeRemainingLength(remainingLength); - packet += topicBytes; - } - - packet += msgBytes; - this.sendPacket(packet); - } - - /** - * 发送 SUBSCRIBE 包 - */ - sendSubscribePacket(topic, qos) { - let packet = String.fromCharCode(0x82); - - const topicBytes = this.encodeString(topic); - let remainingLength = 2 + 1 + topicBytes.length + 1; - - packet += this.encodeRemainingLength(remainingLength); - packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); - - packet += String.fromCharCode(0x00); - packet += topicBytes; - packet += String.fromCharCode(qos); - - this.messageId++; - this.sendPacket(packet); - } - - /** - * 发送 UNSUBSCRIBE 包 - */ - sendUnsubscribePacket(topic) { - let packet = String.fromCharCode(0xA2); - - const topicBytes = this.encodeString(topic); - let remainingLength = 2 + 1 + topicBytes.length; - - packet += this.encodeRemainingLength(remainingLength); - packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF); - - packet += String.fromCharCode(0x00); - packet += topicBytes; - - this.messageId++; - this.sendPacket(packet); - } - - /** - * 发送 PUBACK 包 - */ - sendPubackPacket(packetId) { - try { - let packet = String.fromCharCode(0x40); - packet += String.fromCharCode(0x02); - packet += String.fromCharCode((packetId >> 8) & 0xFF, packetId & 0xFF); - this.sendPacket(packet); - } catch (error) { - this.emit('error', new Error(`发送 PUBACK 失败: ${error.message}`)); - } - } - - /** - * 发送 PINGREQ 包 - */ - sendPingPacket() { - try { - let packet = String.fromCharCode(0xC0); - packet += String.fromCharCode(0x00); - this.sendPacket(packet); - } catch (error) { - this.emit('error', new Error(`发送 PINGREQ 失败: ${error.message}`)); - } - } - - /** - * 启动心跳定时器 - */ - startPingInterval() { - this.stopPingInterval(); - const { keepAlive } = this.config; - if (keepAlive > 0) { - const interval = (keepAlive * 1000) * 0.8; - this.pingIntervalTimer = setInterval(() => { - if (this.connectionStatus === 'connected') { - this.sendPingPacket(); - } - }, interval); - } - } - - /** - * 停止心跳定时器 - */ - stopPingInterval() { - if (this.pingIntervalTimer) { - clearInterval(this.pingIntervalTimer); - this.pingIntervalTimer = null; - } - } - - // ========== 工具方法 ========== - - /** - * 将各种数据格式转换为字节数组 - */ - ab2buffer(data) { - try { - if (typeof data === 'string') { - const bytes = []; - for (let i = 0; i < data.length; i++) { - bytes.push(data.charCodeAt(i) & 0xFF); - } - return bytes; - } else if (data instanceof ArrayBuffer) { - return Array.from(new Uint8Array(data)); - } else if (Array.isArray(data)) { - return data; - } else { - return Array.from(data); - } - } catch (error) { - this.emit('error', new Error(`数据转换失败: ${error.message}`)); - return []; - } - } - - /** - * 字节数组转字符串 - */ - bytesToString(bytes) { - try { - if (!bytes || bytes.length === 0) return ''; - - let str = ''; - for (let i = 0; i < bytes.length; i++) { - const byte = bytes[i] & 0xFF; - if (byte >= 32 && byte <= 126) { - str += String.fromCharCode(byte); - } else { - str += `\\x${byte.toString(16).padStart(2, '0')}`; - } - } - return str; - } catch (error) { - this.emit('error', new Error(`字节转字符串失败: ${error.message}`)); - return ''; - } - } - - /** - * 编码字符串(添加长度前缀) - */ - encodeString(str) { - let result = ''; - const bytes = this.stringToBytes(str); - result += String.fromCharCode((bytes.length >> 8) & 0xFF); - result += String.fromCharCode(bytes.length & 0xFF); - for (let i = 0; i < bytes.length; i++) { - result += String.fromCharCode(bytes[i]); - } - return result; - } - - /** - * 字符串转字节数组 - */ - stringToBytes(str) { - const bytes = []; - for (let i = 0; i < str.length; i++) { - bytes.push(str.charCodeAt(i)); - } - return bytes; - } - - /** - * 编码剩余长度 - */ - encodeRemainingLength(length) { - let result = ''; - do { - let byte = length % 128; - length = Math.floor(length / 128); - if (length > 0) { - byte |= 0x80; - } - result += String.fromCharCode(byte); - } while (length > 0); - return result; - } -} - -// 导出类 -export default MQTTClient; \ No newline at end of file diff --git a/uni_modules/yh-wsmqtt/wsmqtt/package.json b/uni_modules/yh-wsmqtt/wsmqtt/package.json deleted file mode 100644 index 27d5c1e..0000000 --- a/uni_modules/yh-wsmqtt/wsmqtt/package.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "uni-plugin-mqtt-websocket", - "version": "1.0.0", - "description": "基于 WebSocket 的 MQTT 客户端插件,支持 uni-app 多端运行", - "main": "js_sdk/index.js", - "keywords": [ - "uni-app", - "mqtt", - "websocket", - "iot", - "real-time" - ], - "author": "开发者", - "license": "MIT", - "repository": { - "type": "git", - "url": "" - }, - "bugs": { - "url": "" - }, - "homepage": "", - "uni_modules": { - "id": "wsmqtt", - "displayName": "MQTT WebSocket 客户端", - "description": "基于 WebSocket 的 MQTT 客户端插件", - "category": "network", - "platforms": { - "app-plus": {}, - "mp-weixin": {}, - "mp-alipay": {}, - "mp-baidu": {}, - "mp-toutiao": {} - } - } -} \ No newline at end of file