mosca是MQTT官方推荐的MQTT代理(服务端)软件之一。mosca是JavaScript编写Node.js模块,即可以全局安装做为MQTT服务端使用,又可以将其集成到项目中,以实现更多的个性化功能。
1. 做为独立MQTT服务端
mosca是一个Node.js模块,要在服务器单独使用,至少应安装 Node运行环境。
安装mosca
使用npm安装:
npm install mosca bunyan -g
也可以直接使用git安装:
git clone git://github.com/mcollina/mosca.git cd mosca npm install
使用
mosca单独使用时,可以像下面这样运行,并开始接受客户端连接:
mosca -v | bunyan
配置
执行mosca --help可以查看使用帮助:
mosca --help
Usage: mosca [options] [command]
Commands:
adduser <user> <pass> 添加用户到认证文件
rmuser <user> 从认证文件中移除用户
start 开始服务 (可选)
Options:
-h, --help 显示使用信息 information
-V, --version 显示版本号
-p, --port <n> 监听的端口号
--host <IP> 监听的主机各
--parent-port <n> 要连接的父端口号
--parent-host <s> 要连接的父主机名
--parent-prefix <s> 在父代理中使用的前缀
--credentials <file> 认证文件
--authorize-publish <pattern> 指定发布主题时的认证方式(pattern)
--authorize-subscribe <pattern> 指定用户订阅主题时的参数(pattern)
--key <file> 服务器的私钥(使用tls时)
--cert <file> 服务器的证书(使用tls时)
--secure-port <n> 监听的TLS端口号
--non-secure 同时启动安全和非安装两种服务
--http-port <n> 在指定的端口启webSocket服务
--https-port <n> 在指定的端口启安全的webSocket服务
--http-static <directory> 为某些静态文件启动webSocket服务
--https-static <directory> 为某些静态文件启动安全的webSocket服务
--http-bundle 为HTTP客户端绑定/mqtt.js客户端库
--https-bundle 为HTTPs客户端绑定/mqtt.js客户端库
--only-http 仅启动HTTPwebSocket服务
--disable-stats 禁$SYS的发布状态
--broker-id <id> 指定$SYS/<id>命名空间
-c, --config <c> 要使用的配置文件
-d, --db <path> 指定消息存储
-v, --verbose 设置 bunyan 日志到INFO输出
--very-verbose 设置 bunyan 日志到DEBUG输出
充分利用mosca,需要为其指定一个配置文件。下面是一个基于Redis存储的配置文件:
var mosca = require('mosca');
module.exports = {
port: 4883,
// host: "127.0.0.1", //指定要绑定的主机名
id: 'mymosca', //在发布时使用的主题命名空间ID $SYS/id
stats: true, //在主题命名空间使用的状态 $SYS/id
logger: {
level: 'info'
},
backend: {
type: 'redis',
port: 6379,
host: 'localhost',
return_buffers: true
},
persistence: {
factory: mosca.persistence.Redis,
port: 6379,
host: 'localhost'
},
secure: {
keyPath: "/path/to/key",
certPath: "/path/to/cert"
}
};
mosca基于Ascoltatori模块开发,可以支持基于redis、MongoDB、AMQP、ZeroMQ和MQTT代理等方式的消息持久化。
认证
mosca支持将认证用户添加到指定的json文件中,可以命令行中使用如下方式创建文件:
// 添加用户 $ mosca adduser <user> <pass> --credentials ./credentials.json // 添加用户到具体的一个主题 $ mosca adduser myuser mypass --credentials ./credentials.json \ --authorize-publish 'hello/*' --authorize-subscribe 'hello/*' // 移除用户 $ mosca rmuser myuser --credentials ./credentials.json // 从认证文件启用Mosca认证 $ mosca --credentials ./credentials.json
2. 集成到项目中
mosca同样支持将其集成到现有的Node.js项目中,集成到现有项目后,需要编写实现MQTT服务端的代码。
安装设置
npm install mosca --save
获取模块引用、配置pub/sub设置
var mosca = require('mosca');
var pubsubsettings = {
//using ascoltatore
type: 'mongo',
url: 'mongodb://localhost:27017/mqtt',
pubsubCollection: 'ascoltatori',
mongo: {}
};
应用设置
var moscaSettings = {
port: 1883, //mqtt端口
backend: pubsubsettings //应用上在的pub/set
};
var server = new mosca.Server(moscaSettings); // 启动mosca
server.on('ready', setup); //初始化完成后启动setup()
// 准备完成后启动服务
function setup() {
console.log('Mosca server is up and running')
}
向客户端发送数据
与MQTT标准一致,发送数据使用publish方法。发送数据时可以指定:topic、payload、qos及标识retain标识
var message = {
topic: '/hello/world',
payload: 'abcde', // or a Buffer
qos: 0, // 0, 1, or 2
retain: false // or true
};
server.publish(message, function() {
console.log('done!');
});
接收客户端数据
mosca继承了EventEmitter,其接收客户端数据是基于事件监听的。
// 消息发布后触发
server.on('published', function(packet, client) {
console.log('Published', packet);
console.log('Client', client);
});
// 客户端连接后触发
server.on('clientConnected', function(client) {
console.log('Client Connected:', client.id);
});
// 客户端断开连接后触发
server.on('clientDisconnected', function(client) {
console.log('Client Disconnected:', client.id);
});
完整代码
完整代码整理如下:
var mosca = require('mosca')
var ascoltatore = {
//using ascoltatore
type: 'mongo',
url: 'mongodb://localhost:27017/mqtt',
pubsubCollection: 'ascoltatori',
mongo: {}
};
var moscaSettings = {
port: 1883,
backend: ascoltatore,
persistence: {
factory: mosca.persistence.Mongo,
url: 'mongodb://localhost:27017/mqtt'
}
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
server.on('clientConnected', function(client) {
console.log('client connected', client.id);
});
// 收到消息后触发
server.on('published', function(packet, client) {
console.log('Published', packet.payload);
});
// MQTT服务端准备完成后触发
function setup() {
console.log('Mosca server is up and running')
}
下面是一个基于Redis的示例:
var mosca = require('mosca')
var ascoltatore = {
type: 'redis',
redis: require('redis'),
db: 12,
port: 6379,
return_buffers: true, // 处理二进制的payload
host: "localhost"
};
var moscaSettings = {
port: 1883,
backend: ascoltatore,
persistence: {
factory: mosca.persistence.Redis
}
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
server.on('clientConnected', function(client) {
console.log('client connected', client.id);
});
// 收到消息时触发
server.on('published', function(packet, client) {
console.log('Published', packet.payload);
});
// MQTT服务端准备完成后触发
function setup() {
console.log('Mosca server is up and running')
}
注意,如果使用是非标准的Redis或Redis与MQTT不在同一台服务器,需要像下面这样设置persistence:
persistence: {
factory: mosca.persistence.Redis,
host: 'your redis host',
port: 'your redis port'
}
关于数据持久化的更多高级用法,请参考:Mosca advanced usage
