MQTT.js是JavaScript编写的,实现了MQTT协议客户端功能的模块,可以在Node.js或浏览器环境中使用。在Node.js中使用时,即可以-g全局安装以命令行的形式使用,又可以将其集成到项目中调用。
1. 安装与使用
1.1 在项目中安装
MQTT.jsnpm包名为mqtt,安装命令如下:
npm install mqtt --save
使用示例
使用MQTT.js实现一个消息发布者及订阅者:
var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://test.mosquitto.org');
client.on('connect', function () {
//订阅presence主题
client.subscribe('presence');
//向presence主题发布消息
client.publish('presence', 'Hello mqtt');
});
client.on('message', function (topic, message) {
//收到的消息是一个Buffer
console.log(message.toString());
client.end();
});
输出:
Hello mqtt
在上例中test.mosquitto.org是官方提供的MQTT测试服务器,还可以使用test.mosca.io网址进行测试。
了解更多关于MQTT相关介绍请参考:MQTT协议-MQTT协议简介及协议原理
1.2 做为命令行工具使用
MQTT.js支持以命令行模式与服务端通信,以命令行模式使用时,模块需要全局安装:
npm install mqtt -g
安装后,就可以在控制台使用:
mqtt sub -t 'hello' -h 'test.mosquitto.org' -v
下面是另一种使用方式:
mqtt pub -t 'hello' -h 'test.mosquitto.org' -m 'from MQTT.js'
更多命令行使用方法可以执行mqtt help 命令查看。
1.3 在浏览器中使用
在浏览器环境中什么MQTT.js首先需要按AMD或CommandJs规范导出。
Browserify导出
Browserify模块可以将MQTT.js绑定为浏览器可用的,也可以导出为一个单独的浏览器端模块。其导出的模块兼容AMD或CMD标准,且会在添加对象到全局空间。
npm install -g browserify // 安装 browserify cd node_modules/mqtt npm install . // 安装 dev dependencies browserify mqtt.js -s mqtt > browserMqtt.js // 需要在客户端使用的mqtt
Webpack导出
与Browserify类似,该模块也会导出一个浏览器端使用的类库,且会在添加对象到全局空间,导出后可以像var mqtt = xxx这样使用MQTT.js。导出时可以设置AMD、CMD或其它导出格式。
npm install -g webpack // install webpack cd node_modules/mqtt npm install . // install dev dependencies webpack mqtt.js ./browserMqtt.js --output-library mqtt
完成导出后,就可以像在Node中那样使用mqtt.js的API:
<html>
<head>
<title>test Ws mqtt.js</title>
</head>
<body>
<script src="./browserMqtt.js"></script>
<script>
var client = mqtt.connect(); // you add a ws:// url here
client.subscribe("mqtt/demo");
client.on("message", function(topic, payload) {
alert([topic, payload].join(": "));
client.end();
});
client.publish("mqtt/demo", "hello world!");
</script>
</body>
</html>
2. MQTT.js相关API
mqtt.connect()mqtt.Client()mqtt.Client#publish()mqtt.Client#subscribe()mqtt.Client#unsubscribe()mqtt.Client#end()mqtt.Client#handleMessage()mqtt.Store()mqtt.Store#put()mqtt.Store#del()mqtt.Store#createStream()mqtt.Store#close()
mqtt.connect([url], options)
通过指定的url和options连接到mqtt代理,并返回一个Client对象
URL可以支持协议有:'mqtt'、'mqtts'、'tcp'、'tls'、'ws'、'wss'。也可以是一个由Url.parse()返回的对象,这时可以将url和options合并为一个对象。
mqtt.Client(streamBuilder, options)
Client类包装了一个到代理(服务器)的客户端连接,可以支持任何形式的传输协议(TCP、TLS、WebSocket等)
Client类会自动做以下处理:
- 到服务器的定时ping
- QoS流
- 自动重连
- 在连接前开始消息发布
参数说明:
streamBuilder:返回支持连接事件的流类的子类函数。一个典型的net.socket。options客户端连接选项。默认值:keepalive:默认10秒,设为0时将禁用clientId:'mqttjs_' + Math.random().toString(16).substr(2, 8)protocolId:'MQTT'protocolVersion:4clean:true。设置为false时,收到QoS为1和2的消息时,将下线reconnectPeriod:1000毫秒,表示两次重连间隔connectTimeout:30*1000毫秒,表示等待收到CONNACK确认包的时间username:代理(服务器)所需要的用户名,仅需要时password:代理(服务器)所需要的密码,仅需要时incomingStore:将要到达包的StoreoutgoingStore:将要发出包的Storewill:客户端连接失效时自动发送给服务器的消息。格式如下:topic:发布消息的主题payload:发布的消息内容qos:QoSretain:retain标识
如果要使用mqtts(使用tls的mqtt),相关设置通过options对象传给 tls.connect().。如果使用自签名证书,需要将rejectUnauthorized设置为false
如果所要连接的服务器只支持MQTT 3.1(非V3.1.1),需要如下设置:
{
protocolId: 'MQIsdp',
protocolVersion: 3
}
事件:'connect'
function(connack) {}
连接成功时会发送此事件
connack收到的connack包,当clean选项为false且服务器有一个之前的clientId连接选项,这时connack.sessionPresent选项为true
事件:'reconnect'
function() {}
重新连接时触发
事件:'close'
function() {}
断开连接后触发
事件:'offline'
function() {}
客户端下线时触发
事件:'error'
function(error) {}
连接失败或发生错误时触发
事件:'message'
function(topic, message, packet) {}
客户端收到消息时触发
topic:String收到消息的主题message:String或Buffer消息的内容packet:收到的包
mqtt.Client#publish(topic, message, [options], [callback])
发布消息
topic:String发布消息的主题message:String或Buffer发布消息的内容options:消息选项,包括:qos:QoS级别,默认0retain:retain标识,默认false
callback:回调会在QoS处理完成或在下次处理前发生(QoS为0时)
mqtt.Client#subscribe(topic/topic array/topic object, [options], [callback])
订阅一个或一组主题(Topic)
topic:String或Array表示要订阅的主题options:订阅主题的选项:qos:QoS级别,默认0
callback:function(err, granted)回调函数,suback时触发err:订阅时发生的错误granted:订阅成功时触发[topic, qos]
mqtt.Client#unsubscribe(topic/topic array, [options], [callback])
退订一个或一组主题(Topic)
topic:String要取消订阅的主题callback:回调函数,退订成功后触发
mqtt.Client#end([force], [cb])
关闭客户端连接,选项如下:
force:可选,强制关闭而无需等待传送中的消息完成cb:回调函数,连接关闭后触发,可选
mqtt.Client#handleMessage(packet, callback)
关闭客户端连接,选项如下:
mqtt.Store()
实现消息存储
mqtt.Store#put(packet, callback)
添加一个消息到消息存储中,数据包表示有messageId属性的消息。回调函数在存储完成后触发。
mqtt.Store#del(packet, cb)
从消息存储中移除一个数据包。回调函数在移除完成后触发。
mqtt.Store#createStream()
在所有包存储完成后创建一个流
mqtt.Store#close(cb)
关闭消息存储
