This commit is contained in:
pengqiao1993
2026-03-10 16:36:41 +08:00
parent 9b0d792dbf
commit e1798023c2
7 changed files with 1413 additions and 287 deletions

View File

@ -1,27 +1,16 @@
<template> <template>
<view style="height: 100vh"> <view style="height: 100vh">
<view class=""> <view class="">
<u-navbar <u-navbar title="策略下发" :is-back="true" :background="background" :border-bottom="false"
title="策略下发" :custom-back="toback">
:is-back="true"
:background="background"
:border-bottom="false"
:custom-back="toback"
>
</u-navbar> </u-navbar>
</view> </view>
<view class="policeForm"> <view class="policeForm">
<view class="" v-for="(item, index) in smallArr" :key="index"> <view class="" v-for="(item, index) in smallArr" :key="index">
<view class="" v-if="item.type == 'Input'"> <view class="" v-if="item.type == 'Input'">
<u-form-item :label="item[`label_${lang}`]" :prop="item.prop"> <u-form-item :label="item[`label_${lang}`]" :prop="item.prop">
<u-input <u-input :min="item.min" :max="item.max" v-model.number="item.value" type="number"
:min="item.min" :placeholder="item[`place_${lang}`]" @input="handleInput(item, $event)" />
:max="item.max"
v-model.number="item.value"
type="number"
:placeholder="item[`place_${lang}`]"
@input="handleInput(item, $event)"
/>
</u-form-item> </u-form-item>
</view> </view>
<view class="" v-if="item.type == 'Switch'"> <view class="" v-if="item.type == 'Switch'">
@ -29,56 +18,50 @@
<u-switch :size="40" v-model="item.value"></u-switch> <u-switch :size="40" v-model="item.value"></u-switch>
</u-form-item> </u-form-item>
</view> </view>
<view <view class="" v-if="item.type == 'Select'" @click="selectShow(item, index)">
class=""
v-if="item.type == 'Select'"
@click="selectShow(item, index)"
>
<u-form-item :label="item[`label_${lang}`]" :prop="item.prop"> <u-form-item :label="item[`label_${lang}`]" :prop="item.prop">
<view class="" v-show="false"> <view class="" v-show="false">
<u-input <u-input v-model="item.value" disabled @click="selectShow(item, index)" />
v-model="item.value"
disabled
@click="selectShow(item, index)"
/>
</view> </view>
<view class=""> <view class="">
{{ item.selectLabel || $t("homePage.alarm.placeSelect") }} {{ item.selectLabel || $t("homePage.alarm.placeSelect") }}
</view> </view>
<u-select <u-select :key="item.prop" @confirm="confirm($event, item)" v-model="item.isShow"
:key="item.prop" :list="getSelectList(item)"></u-select>
@confirm="confirm($event, item)"
v-model="item.isShow"
:list="getSelectList(item)"
></u-select>
</u-form-item> </u-form-item>
</view> </view>
</view> </view>
<button <button @click="submitDevice" type="success" size="mini" style="
@click="submitDevice"
type="success"
size="mini"
style="
background-color: #009458; background-color: #009458;
padding: 10rpx 0; padding: 10rpx 0;
color: #fff; color: #fff;
margin-top: 40rpx; margin-top: 40rpx;
width: 100%; width: 100%;
" ">
>
{{ $t("homePage.mine.submit") }} {{ $t("homePage.mine.submit") }}
</button> </button>
</view> </view>
<mqtt-client :options="mqttOptions" @connect="onConnect" @message="onMessage">
</mqtt-client>
</view> </view>
</template> </template>
<script> <script>
import { formList } from "@/common/form.js"; import {
import { Langlist } from "@/common/lang"; formList
} from "@/common/form.js";
import {
Langlist
} from "@/common/lang";
import pako from "pako"; import pako from "pako";
import mqttClient from '@/uni_modules/yh-wsmqtt/wsmqtt/components/mqtt-client.vue'
import * as Paho from "@/static/lib/mqtt.js"; import * as Paho from "@/static/lib/mqtt.js";
export default { export default {
components: {
mqttClient
},
data() { data() {
return { return {
formList: formList, formList: formList,
@ -89,6 +72,11 @@ export default {
mqttClient: null, mqttClient: null,
backData: {}, backData: {},
smallArr: [], smallArr: [],
mqttOptions: {
host: '13.39.200.14',
port: 8083,
path: '/mqtt'
}
}; };
}, },
computed: { computed: {
@ -108,17 +96,23 @@ export default {
}, },
}, },
onShow() { onShow() {
if (!this.mqttClient) { // if (!this.mqttClient) {
this.initMQTT() // this.initMQTT()
} // }
}, },
beforeDestroy() { beforeDestroy() {
if (this.mqttClient) { // if (this.mqttClient) {
this.mqttClient.disconnect() // this.mqttClient.disconnect()
this.mqttClient = null // this.mqttClient = null
} // }
}, },
methods: { methods: {
onConnect() {
console.log('连接成功')
},
onMessage(msg) {
console.log('收到消息:', msg)
},
decodeGzipBase64(base64Str) { decodeGzipBase64(base64Str) {
// 1. base64 → Uint8Array // 1. base64 → Uint8Array
const binaryStr = atob(base64Str); const binaryStr = atob(base64Str);

View File

@ -0,0 +1,25 @@
{
"id": "yh-wsmqtt",
"name": "websocket MQTT 通讯插件纯websocket 实现",
"displayName": "websocket MQTT 通讯插件纯websocket 实现",
"version": "1.0.0",
"description": "websocket MQTT 通讯插件纯websocket 实现",
"keywords": [
"websocket;MQTT;uni"
],
"dcloudext": {
"type": "js_sdk",
"sale": {
"regular": {
"price": "0.00"
},
"sourcecode": {
"price": "0.00"
}
}
},
"uni_modules": {
"dependencies": [],
"encrypt": []
}
}

View File

@ -0,0 +1,173 @@
# MQTT WebSocket 客户端插件
基于 WebSocket 的 MQTT 客户端插件,支持 uni-app 多端运行。
## 功能特性
- ✅ 支持 MQTT 3.1.1 和 MQTT 5.0 协议
- ✅ 基于 uni-app 的 WebSocket API 实现
- ✅ 自动心跳保活机制
- ✅ 支持自动重连
- ✅ 提供 JavaScript SDK 和 Vue 组件两种使用方式
- ✅ 支持订阅/发布消息
- ✅ 完整的连接状态管理
- ✅ 多端兼容App、小程序、H5
## 安装
1. 将本插件文件夹 `wsmqtt` 复制到项目的 `uni_modules` 目录下
2. 在需要使用的页面或组件中引入
## 使用方式
### 方式一JavaScript SDK推荐
```javascript
import MQTTClient from '@/uni_modules/wsmqtt/js_sdk/index.js'
// 创建客户端实例
const mqttClient = new MQTTClient({
host: 'broker.emqx.io',
port: 8083,
path: '/mqtt',
clientId: 'client_' + Date.now(),
username: '',
password: '',
protocolVersion: 4, // 4 for MQTT 3.1.1, 5 for MQTT 5.0
keepAlive: 60,
connectTimeout: 10000,
reconnectPeriod: 5000
})
// 监听连接事件
mqttClient.on('connect', () => {
console.log('连接成功')
})
// 监听消息事件
mqttClient.on('message', (msg) => {
console.log('收到消息:', msg)
})
// 连接服务器
mqttClient.connect()
// 发布消息
mqttClient.publish('test/topic', 'Hello MQTT', 0)
// 订阅主题
mqttClient.subscribe('test/topic', 0)
// 取消订阅
mqttClient.unsubscribe('test/topic')
// 断开连接
mqttClient.disconnect()
```
### 方式二Vue 组件
```vue
<template>
<view>
<mqtt-client :options="mqttOptions" @connect="onConnect" @message="onMessage">
<template v-slot="{ connect, disconnect, publish, status }">
<button @click="connect">连接</button>
<button @click="disconnect">断开</button>
<text>状态: {{ status }}</text>
</template>
</mqtt-client>
</view>
</template>
<script>
import mqttClient from '@/uni_modules/wsmqtt/components/mqtt-client.vue'
export default {
components: {
mqttClient
},
data() {
return {
mqttOptions: {
host: 'broker.emqx.io',
port: 8083,
path: '/mqtt'
}
}
},
methods: {
onConnect() {
console.log('连接成功')
},
onMessage(msg) {
console.log('收到消息:', msg)
}
}
}
</script>
```
## API 文档
### MQTTClient 类
#### 构造函数
```javascript
new MQTTClient(options)
```
#### 配置选项
| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| host | string | '' | MQTT 服务器地址 |
| port | number | 8083 | 端口号 |
| path | string | '/mqtt' | WebSocket 路径 |
| clientId | string | 'mqttx_' + Date.now() | 客户端 ID |
| username | string | '' | 用户名 |
| password | string | '' | 密码 |
| protocolVersion | number | 4 | 协议版本3(MQTT 3.1), 4(MQTT 3.1.1), 5(MQTT 5.0) |
| clean | boolean | true | MQTT 3.1.1 Clean Session |
| cleanStart | boolean | false | MQTT 5.0 Clean Start |
| keepAlive | number | 60 | 心跳间隔(秒) |
| sessionExpiryInterval | number | 4294967295 | 会话过期时间 |
| connectTimeout | number | 10000 | 连接超时时间(毫秒) |
| reconnectPeriod | number | 0 | 重连间隔毫秒0 表示不自动重连 |
#### 方法
- `connect()`: 连接服务器
- `disconnect()`: 断开连接
- `publish(topic, message, qos)`: 发布消息
- `subscribe(topic, qos)`: 订阅主题
- `unsubscribe(topic)`: 取消订阅
- `on(event, callback)`: 添加事件监听
- `off(event, callback)`: 移除事件监听
- `getStatus()`: 获取当前连接状态
- `getSubscribedTopics()`: 获取已订阅的主题列表
#### 事件
- `connect`: 连接成功
- `disconnect`: 连接断开
- `error`: 发生错误
- `message`: 收到或发送消息
- `statusChange`: 连接状态变化
- `subscribed`: 订阅成功
- `unsubscribed`: 取消订阅成功
## 示例
本项目包含完整的示例页面,位于 `pages/mqtt-demo/index.vue`,展示了插件的所有功能。
## 注意事项
1. 小程序端需要使用真机调试,部分模拟器可能不支持 WebSocket
2. 确保服务器支持 WebSocket 协议的 MQTT
3. 不同平台的 WebSocket 实现可能有差异,建议在各平台测试
## 更新日志
详见 [CHANGELOG.md](./changelog.md)
## 许可证
MIT License

View File

@ -0,0 +1,11 @@
# 更新日志
## v1.0.0 (2026-01-15)
### 新增
- 初始版本发布
- 基于 WebSocket 的 MQTT 客户端实现
- 支持 MQTT 3.1.1 和 MQTT 5.0 协议
- 提供完整的连接、发布、订阅、取消订阅功能
- 内置心跳机制和自动重连
- 提供 Vue 组件封装

View File

@ -0,0 +1,95 @@
<template>
<view>
<!-- MQTT 客户端组件 -->
<slot :connect="connect" :disconnect="disconnect" :publish="publish" :subscribe="subscribe" :unsubscribe="unsubscribe" :status="status" :topics="topics">
<!-- 默认插槽内容用户可自定义 -->
</slot>
</view>
</template>
<script>
import MQTTClient from '../js_sdk/index.js'
export default {
name: 'mqtt-client',
props: {
options: {
type: Object,
default: () => ({})
}
},
data() {
return {
mqtt: null,
status: 'disconnected',
topics: []
}
},
created() {
this.mqtt = new MQTTClient(this.options)
this.mqtt.on('statusChange', (status) => {
this.status = status
this.$emit('status-change', status)
})
this.mqtt.on('message', (msg) => {
this.$emit('message', msg)
})
this.mqtt.on('connect', () => {
this.$emit('connect')
})
this.mqtt.on('disconnect', (res) => {
this.$emit('disconnect', res)
})
this.mqtt.on('error', (error) => {
this.$emit('error', error)
})
},
beforeDestroy() {
if (this.mqtt) {
this.mqtt.disconnect()
}
},
methods: {
connect() {
if (this.mqtt) {
this.mqtt.connect()
}
},
disconnect() {
if (this.mqtt) {
this.mqtt.disconnect()
}
},
publish(topic, message, qos = 0) {
if (this.mqtt) {
this.mqtt.publish(topic, message, qos)
}
},
subscribe(topic, qos = 0) {
if (this.mqtt) {
this.mqtt.subscribe(topic, qos)
if (!this.topics.includes(topic)) {
this.topics.push(topic)
}
}
},
unsubscribe(topic) {
if (this.mqtt) {
this.mqtt.unsubscribe(topic)
const index = this.topics.indexOf(topic)
if (index > -1) {
this.topics.splice(index, 1)
}
}
}
}
}
</script>
<style>
</style>

View File

@ -0,0 +1,792 @@
/**
* MQTT 客户端插件 for uni-app
* 基于 WebSocket 实现 MQTT 协议
*/
class MQTTClient {
/**
* 构造函数
* @param {Object} options 配置选项
*/
constructor(options = {}) {
// 默认配置
this.config = {
host: '',
port: 8083,
path: '/mqtt',
clientId: 'mqttx_' + Date.now(),
username: '',
password: '',
protocolVersion: 4, // 4 for MQTT 3.1.1
clean: true,
cleanStart: false, // for MQTT 5.0
keepAlive: 60,
sessionExpiryInterval: 4294967295,
connectTimeout: 10000,
reconnectPeriod: 0
};
// 合并用户配置
Object.assign(this.config, options);
// 状态
this.socket = null;
this.connectionStatus = 'disconnected'; // 'disconnected', 'connecting', 'connected', 'error'
this.subscribedTopics = [];
this.messageId = 1;
this.connectTimeoutTimer = null;
this.reconnectTimer = null;
this.pingIntervalTimer = null;
// 事件监听器
this.eventListeners = {
connect: [],
disconnect: [],
error: [],
message: [],
statusChange: []
};
// 绑定方法
this.on = this.on.bind(this);
this.off = this.off.bind(this);
this.connect = this.connect.bind(this);
this.disconnect = this.disconnect.bind(this);
this.publish = this.publish.bind(this);
this.subscribe = this.subscribe.bind(this);
this.unsubscribe = this.unsubscribe.bind(this);
}
/**
* 添加事件监听器
* @param {string} event 事件名称: 'connect', 'disconnect', 'error', 'message', 'statusChange'
* @param {Function} callback 回调函数
*/
on(event, callback) {
if (this.eventListeners[event]) {
this.eventListeners[event].push(callback);
}
}
/**
* 移除事件监听器
* @param {string} event 事件名称
* @param {Function} callback 回调函数
*/
off(event, callback) {
if (this.eventListeners[event]) {
const index = this.eventListeners[event].indexOf(callback);
if (index > -1) {
this.eventListeners[event].splice(index, 1);
}
}
}
/**
* 触发事件
* @param {string} event 事件名称
* @param {...any} args 参数
*/
emit(event, ...args) {
if (this.eventListeners[event]) {
this.eventListeners[event].forEach(callback => {
try {
callback(...args);
} catch (err) {
console.error(`Error in ${event} event listener:`, err);
}
});
}
}
/**
* 更新连接状态
* @param {string} status 新状态
*/
setConnectionStatus(status) {
if (this.connectionStatus !== status) {
this.connectionStatus = status;
this.emit('statusChange', status);
}
}
/**
* 连接 MQTT 服务器
*/
connect() {
const { host, port, path, clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval, connectTimeout } = this.config;
if (!host) {
this.emit('error', new Error('请输入主机地址'));
return;
}
this.setConnectionStatus('connecting');
this.emit('statusChange', 'connecting');
// 构建 WebSocket URL
let urlPath = path;
if (!urlPath.startsWith('/')) {
urlPath = '/' + urlPath;
}
let url = `ws://${host}:${port}${urlPath}`;
// 关闭现有连接
if (this.socket) {
this.socket.close();
this.socket = null;
}
// 建立 WebSocket 连接
this.socket = uni.connectSocket({
url,
protocols: ['mqttv3.1.1', 'mqtt'],
header: {
'content-type': 'application/octet-stream'
},
success: () => {
// 连接请求已发送
},
fail: (err) => {
this.setConnectionStatus('error');
this.emit('error', new Error(`WebSocket 连接失败: ${JSON.stringify(err)}`));
}
});
// 设置连接超时
if (connectTimeout > 0) {
this.connectTimeoutTimer = setTimeout(() => {
if (this.connectionStatus === 'connecting') {
this.setConnectionStatus('error');
this.emit('error', new Error(`连接超时 (${connectTimeout}ms)`));
if (this.socket) {
this.socket.close();
this.socket = null;
}
}
}, connectTimeout);
}
// WebSocket 事件监听
this.socket.onOpen(() => {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
this.sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval);
});
this.socket.onMessage((res) => {
this.handleMQTTMessage(res.data);
});
this.socket.onError((err) => {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
this.stopPingInterval();
this.setConnectionStatus('error');
this.emit('error', new Error(`WebSocket 错误: ${JSON.stringify(err)}`));
});
this.socket.onClose((res) => {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
this.stopPingInterval();
if (this.connectionStatus === 'connected' && this.config.reconnectPeriod > 0) {
this.reconnectTimer = setTimeout(() => {
this.connect();
}, this.config.reconnectPeriod);
}
this.setConnectionStatus('disconnected');
this.emit('disconnect', res);
});
}
/**
* 断开连接
*/
disconnect() {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.stopPingInterval();
if (this.socket) {
this.sendDisconnectPacket();
setTimeout(() => {
this.socket.close();
this.socket = null;
this.setConnectionStatus('disconnected');
this.subscribedTopics = [];
}, 100);
}
}
/**
* 发布消息
* @param {string} topic 主题
* @param {string} message 消息内容
* @param {number} qos QoS 等级 (0, 1, 2)
*/
publish(topic, message, qos = 0) {
if (this.connectionStatus !== 'connected') {
this.emit('error', new Error('请先连接'));
return;
}
if (!topic) {
this.emit('error', new Error('请输入主题'));
return;
}
try {
this.sendPublishPacket(topic, message, qos);
this.emit('message', { type: 'send', topic, message });
} catch (error) {
this.emit('error', new Error(`发布失败: ${error.message}`));
}
}
/**
* 订阅主题
* @param {string} topic 主题
* @param {number} qos QoS 等级 (0, 1, 2)
*/
subscribe(topic, qos = 0) {
if (this.connectionStatus !== 'connected') {
this.emit('error', new Error('请先连接'));
return;
}
if (!topic) {
this.emit('error', new Error('请输入订阅主题'));
return;
}
try {
this.sendSubscribePacket(topic, qos);
if (!this.subscribedTopics.includes(topic)) {
this.subscribedTopics.push(topic);
}
} catch (error) {
this.emit('error', new Error(`订阅失败: ${error.message}`));
}
}
/**
* 取消订阅
* @param {string} topic 主题
*/
unsubscribe(topic) {
if (this.connectionStatus !== 'connected') {
this.emit('error', new Error('请先连接'));
return;
}
if (!topic) {
this.emit('error', new Error('请输入取消订阅的主题'));
return;
}
try {
this.sendUnsubscribePacket(topic);
const index = this.subscribedTopics.indexOf(topic);
if (index > -1) {
this.subscribedTopics.splice(index, 1);
}
} catch (error) {
this.emit('error', new Error(`取消订阅失败: ${error.message}`));
}
}
/**
* 获取当前连接状态
* @returns {string} 连接状态
*/
getStatus() {
return this.connectionStatus;
}
/**
* 获取已订阅的主题列表
* @returns {Array} 主题列表
*/
getSubscribedTopics() {
return [...this.subscribedTopics];
}
// ========== 内部方法 ==========
/**
* 发送数据包
* @param {string|ArrayBuffer|Array} packet 数据包
*/
sendPacket(packet) {
if (!this.socket) {
return;
}
let arrayBuffer;
if (typeof packet === 'string') {
const bytes = this.ab2buffer(packet);
arrayBuffer = new Uint8Array(bytes).buffer;
} else if (packet instanceof ArrayBuffer) {
arrayBuffer = packet;
} else if (Array.isArray(packet)) {
arrayBuffer = new Uint8Array(packet).buffer;
} else {
this.emit('error', new Error('未知的数据包格式'));
return;
}
uni.sendSocketMessage({
data: arrayBuffer,
success: () => {
// 发送成功
},
fail: (err) => {
this.emit('error', new Error(`数据发送失败: ${JSON.stringify(err)}`));
}
});
}
/**
* 处理 MQTT 消息
* @param {*} data 原始数据
*/
handleMQTTMessage(data) {
try {
const buffer = this.ab2buffer(data);
if (buffer.length === 0) {
return;
}
const packetType = buffer[0] & 0xF0;
const flags = buffer[0] & 0x0F;
let pos = 1;
let remainingLength = 0;
let multiplier = 1;
let encodedByte;
do {
if (pos >= buffer.length) {
return;
}
encodedByte = buffer[pos];
remainingLength += (encodedByte & 0x7F) * multiplier;
multiplier *= 128;
pos++;
} while ((encodedByte & 0x80) !== 0 && pos < buffer.length);
if (packetType === 0x20) {
this.handleConnackPacket(buffer, pos);
} else if (packetType === 0x30 || packetType === 0x32) {
this.handlePublishPacket(buffer, pos, flags);
} else if (packetType === 0x40) {
this.handlePubackPacket(buffer, pos);
} else if (packetType === 0x90) {
this.handleSubackPacket(buffer, pos);
} else if (packetType === 0xB0) {
this.handleUnsubackPacket(buffer, pos);
} else if (packetType === 0xD0) {
// 心跳响应
} else if (packetType === 0xE0) {
this.handleDisconnectPacket(buffer, pos);
}
} catch (error) {
this.emit('error', new Error(`处理 MQTT 消息时出错: ${error.message}`));
}
}
/**
* 处理 CONNACK 包
*/
handleConnackPacket(buffer, pos) {
if (buffer.length < pos + 2) {
this.emit('error', new Error('CONNACK 包长度不足'));
return;
}
const flags = buffer[pos++];
const reasonCode = buffer[pos++];
const reasonCodes = {
0: '成功',
128: '协议版本错误',
129: '客户端ID被拒绝',
130: '服务器不可用',
131: '用户名密码错误',
132: '未授权',
133: '服务器未找到',
134: '服务器不可用',
135: '用户名密码无效'
};
const reasonText = reasonCodes[reasonCode] || `未知错误 (${reasonCode})`;
if (reasonCode === 0) {
this.setConnectionStatus('connected');
this.emit('connect');
this.startPingInterval();
} else {
this.setConnectionStatus('error');
this.emit('error', new Error(`MQTT 连接失败: ${reasonText}`));
}
}
/**
* 处理 PUBLISH 包
*/
handlePublishPacket(buffer, pos, flags) {
try {
const qos = (flags >> 1) & 0x03;
if (buffer.length < pos + 2) {
this.emit('error', new Error('PUBLISH 包主题长度字段不完整'));
return;
}
const topicLen = buffer[pos++] * 256 + buffer[pos++];
if (buffer.length < pos + topicLen) {
this.emit('error', new Error('PUBLISH 包主题数据不完整'));
return;
}
const topicBytes = buffer.slice(pos, pos + topicLen);
const topic = this.bytesToString(topicBytes);
pos += topicLen;
let packetId = 0;
if (qos > 0) {
if (buffer.length < pos + 2) {
this.emit('error', new Error('PUBLISH 包包ID字段不完整'));
return;
}
packetId = buffer[pos++] * 256 + buffer[pos++];
}
const payloadBytes = buffer.slice(pos);
const payload = this.bytesToString(payloadBytes);
this.emit('message', { type: 'receive', topic, payload });
if (qos === 1) {
this.sendPubackPacket(packetId);
}
} catch (error) {
this.emit('error', new Error(`处理 PUBLISH 包时出错: ${error.message}`));
}
}
/**
* 处理 PUBACK 包
*/
handlePubackPacket(buffer, pos) {
if (buffer.length >= pos + 2) {
const packetId = buffer[pos++] * 256 + buffer[pos++];
// 可以触发事件
}
}
/**
* 处理 SUBACK 包
*/
handleSubackPacket(buffer, pos) {
if (buffer.length >= pos + 2) {
const packetId = buffer[pos++] * 256 + buffer[pos++];
this.emit('subscribed', packetId);
}
}
/**
* 处理 UNSUBACK 包
*/
handleUnsubackPacket(buffer, pos) {
if (buffer.length >= pos + 2) {
const packetId = buffer[pos++] * 256 + buffer[pos++];
this.emit('unsubscribed', packetId);
}
}
/**
* 处理 DISCONNECT 包
*/
handleDisconnectPacket(buffer, pos) {
this.setConnectionStatus('disconnected');
this.emit('disconnect', { type: 'server' });
}
/**
* 发送 CONNECT 包
*/
sendConnectPacket(clientId, username, password, protocolVersion, clean, cleanStart, keepAlive, sessionExpiryInterval) {
try {
let packet = '';
packet += String.fromCharCode(0x10);
let variableHeader = '';
variableHeader += String.fromCharCode(0x00, 0x04);
variableHeader += 'MQTT';
variableHeader += String.fromCharCode(protocolVersion);
let flags = 0x00;
if (protocolVersion === 5) {
if (cleanStart) flags |= 0x02;
} else {
if (clean) flags |= 0x02;
}
if (username) flags |= 0x80;
if (password) flags |= 0x40;
variableHeader += String.fromCharCode(flags);
variableHeader += String.fromCharCode((keepAlive >> 8) & 0xFF, keepAlive & 0xFF);
let payload = this.encodeString(clientId);
if (username) payload += this.encodeString(username);
if (password) payload += this.encodeString(password);
const remainingLength = variableHeader.length + payload.length;
packet += this.encodeRemainingLength(remainingLength);
packet += variableHeader;
packet += payload;
this.sendPacket(packet);
} catch (error) {
this.emit('error', new Error(`发送 CONNECT 包失败: ${error.message}`));
}
}
/**
* 发送 DISCONNECT 包
*/
sendDisconnectPacket() {
let packet = String.fromCharCode(0xE0);
packet += String.fromCharCode(0x00);
this.sendPacket(packet);
}
/**
* 发送 PUBLISH 包
*/
sendPublishPacket(topic, message, qos) {
let packet = '';
const flags = 0x30 | (qos << 1);
packet += String.fromCharCode(flags);
const topicBytes = this.encodeString(topic);
const msgBytes = this.stringToBytes(message);
let remainingLength = topicBytes.length + msgBytes.length;
if (qos > 0) {
remainingLength += 2;
packet += this.encodeRemainingLength(remainingLength);
packet += topicBytes;
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
this.messageId++;
} else {
packet += this.encodeRemainingLength(remainingLength);
packet += topicBytes;
}
packet += msgBytes;
this.sendPacket(packet);
}
/**
* 发送 SUBSCRIBE 包
*/
sendSubscribePacket(topic, qos) {
let packet = String.fromCharCode(0x82);
const topicBytes = this.encodeString(topic);
let remainingLength = 2 + 1 + topicBytes.length + 1;
packet += this.encodeRemainingLength(remainingLength);
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
packet += String.fromCharCode(0x00);
packet += topicBytes;
packet += String.fromCharCode(qos);
this.messageId++;
this.sendPacket(packet);
}
/**
* 发送 UNSUBSCRIBE 包
*/
sendUnsubscribePacket(topic) {
let packet = String.fromCharCode(0xA2);
const topicBytes = this.encodeString(topic);
let remainingLength = 2 + 1 + topicBytes.length;
packet += this.encodeRemainingLength(remainingLength);
packet += String.fromCharCode((this.messageId >> 8) & 0xFF, this.messageId & 0xFF);
packet += String.fromCharCode(0x00);
packet += topicBytes;
this.messageId++;
this.sendPacket(packet);
}
/**
* 发送 PUBACK 包
*/
sendPubackPacket(packetId) {
try {
let packet = String.fromCharCode(0x40);
packet += String.fromCharCode(0x02);
packet += String.fromCharCode((packetId >> 8) & 0xFF, packetId & 0xFF);
this.sendPacket(packet);
} catch (error) {
this.emit('error', new Error(`发送 PUBACK 失败: ${error.message}`));
}
}
/**
* 发送 PINGREQ 包
*/
sendPingPacket() {
try {
let packet = String.fromCharCode(0xC0);
packet += String.fromCharCode(0x00);
this.sendPacket(packet);
} catch (error) {
this.emit('error', new Error(`发送 PINGREQ 失败: ${error.message}`));
}
}
/**
* 启动心跳定时器
*/
startPingInterval() {
this.stopPingInterval();
const { keepAlive } = this.config;
if (keepAlive > 0) {
const interval = (keepAlive * 1000) * 0.8;
this.pingIntervalTimer = setInterval(() => {
if (this.connectionStatus === 'connected') {
this.sendPingPacket();
}
}, interval);
}
}
/**
* 停止心跳定时器
*/
stopPingInterval() {
if (this.pingIntervalTimer) {
clearInterval(this.pingIntervalTimer);
this.pingIntervalTimer = null;
}
}
// ========== 工具方法 ==========
/**
* 将各种数据格式转换为字节数组
*/
ab2buffer(data) {
try {
if (typeof data === 'string') {
const bytes = [];
for (let i = 0; i < data.length; i++) {
bytes.push(data.charCodeAt(i) & 0xFF);
}
return bytes;
} else if (data instanceof ArrayBuffer) {
return Array.from(new Uint8Array(data));
} else if (Array.isArray(data)) {
return data;
} else {
return Array.from(data);
}
} catch (error) {
this.emit('error', new Error(`数据转换失败: ${error.message}`));
return [];
}
}
/**
* 字节数组转字符串
*/
bytesToString(bytes) {
try {
if (!bytes || bytes.length === 0) return '';
let str = '';
for (let i = 0; i < bytes.length; i++) {
const byte = bytes[i] & 0xFF;
if (byte >= 32 && byte <= 126) {
str += String.fromCharCode(byte);
} else {
str += `\\x${byte.toString(16).padStart(2, '0')}`;
}
}
return str;
} catch (error) {
this.emit('error', new Error(`字节转字符串失败: ${error.message}`));
return '';
}
}
/**
* 编码字符串(添加长度前缀)
*/
encodeString(str) {
let result = '';
const bytes = this.stringToBytes(str);
result += String.fromCharCode((bytes.length >> 8) & 0xFF);
result += String.fromCharCode(bytes.length & 0xFF);
for (let i = 0; i < bytes.length; i++) {
result += String.fromCharCode(bytes[i]);
}
return result;
}
/**
* 字符串转字节数组
*/
stringToBytes(str) {
const bytes = [];
for (let i = 0; i < str.length; i++) {
bytes.push(str.charCodeAt(i));
}
return bytes;
}
/**
* 编码剩余长度
*/
encodeRemainingLength(length) {
let result = '';
do {
let byte = length % 128;
length = Math.floor(length / 128);
if (length > 0) {
byte |= 0x80;
}
result += String.fromCharCode(byte);
} while (length > 0);
return result;
}
}
// 导出类
export default MQTTClient;

View File

@ -0,0 +1,36 @@
{
"name": "uni-plugin-mqtt-websocket",
"version": "1.0.0",
"description": "基于 WebSocket 的 MQTT 客户端插件,支持 uni-app 多端运行",
"main": "js_sdk/index.js",
"keywords": [
"uni-app",
"mqtt",
"websocket",
"iot",
"real-time"
],
"author": "开发者",
"license": "MIT",
"repository": {
"type": "git",
"url": ""
},
"bugs": {
"url": ""
},
"homepage": "",
"uni_modules": {
"id": "wsmqtt",
"displayName": "MQTT WebSocket 客户端",
"description": "基于 WebSocket 的 MQTT 客户端插件",
"category": "network",
"platforms": {
"app-plus": {},
"mp-weixin": {},
"mp-alipay": {},
"mp-baidu": {},
"mp-toutiao": {}
}
}
}