PomeloApplicationgetBasefilterbeforeafterglobalFilterglobalBeforeglobalAfterrpcBeforerpcAfterloadloadConfigroutebeforeStopHookstartsetgetenableddisabledenabledisableconfigureregisterAdminusetransactiongetMastergetCurServergetServerIdgetServerTypegetServersgetServersFromConfiggetServerTypesgetServerByIdgetServerFromConfiggetServersByTypeisFrontendisBackendisMasteraddServersremoveServersreplaceServersaddCronsremoveCrons
BackendSessionServiceBackendSessionSessionServiceChannelServiceChannel
1. Pomelo
导出createApplication()
Pomelo是引用'pomelo'模块后,所获取的一个顶级对象。
createApp
Pomelo.createApp()
- 类型:方法
创建 pomelo 应用
通过Pomelo.createApp()所创建的应用,将会做为一个只读属性挂到Pomelo上,并可通过Pomelo.app来访问所创建的应用。如下所示:
const pomelo = require('pomelo');
// 访问应用
pomelo.app;
源码
Pomelo.createApp = function (opts) {
var app = application;
app.init(opts);
self.app = app;
return app;
};
2. Application
Application 原型
getBase
Application.getBase()
- 类型:方法
获取应用的基础路径
Pomelo 应用启动后,可通过以下方式获取路径:
app.getBase(); // /home/game
源码
Application.getBase = function() {
return this.get(Constants.Reserved.BASE);
};
filter
Application.filter()
- 类型:方法
- 参数
filter所要添加的筛选器
将一个filter添加到beforeFilter和afterFilter
源码
Application.filter = function (filter) {
this.before(filter);
this.after(filter);
};
before
Application.before()
- 类型:方法
- 参数
bf所要添加的beforeFilter筛选器,其签名为bf(msg, session, next)
添加一个beforeFilter
源码
Application.before = function (bf) {
addFilter(this, Constants.KeyWords.BEFORE_FILTER, bf);
};
after
Application.after()
- 类型:方法
- 参数
af所要添加的afterFilter筛选器,其签名为af(err, msg, session, resp, nex)
添加一个afterFilter
源码
Application.after = function (af) {
addFilter(this, Constants.KeyWords.AFTER_FILTER, af);
};
globalFilter
Application.globalFilter()
- 类型:方法
- 参数
filter所要添加的筛选器
添加一个全局筛选器到全局beforeFilter和全局afterFilter
源码
Application.globalFilter = function (filter) {
this.globalBefore(filter);
this.globalAfter(filter);
};
globalBefore
Application.globalBefore()
- 类型:方法
- 参数
bf所要添加的beforeFilter筛选器,其签名为bf(msg, session, next)
添加一个全局beforeFilter
源码
Application.globalBefore = function (bf) {
addFilter(this, Constants.KeyWords.GLOBAL_BEFORE_FILTER, bf);
};
globalAfter
Application.globalAfter()
- 类型:方法
- 参数
af所要添加的afterFilter筛选器,其签名为af(err, msg, session, resp, nex)
添加一个全局afterFilter
源码
Application.globalAfter = function (af) {
addFilter(this, Constants.KeyWords.GLOBAL_AFTER_FILTER, af);
};
rpcBefore
Application.rpcBefore()
- 类型:方法
- 参数
bf所要添加的 rpcbeforeFilter筛选器,其签名为bf(serverId, msg, opts, next)
添加一个rpc beforeFilter
源码
Application.rpcBefore = function(bf) {
addFilter(this, Constants.KeyWords.RPC_BEFORE_FILTER, bf);
};
rpcAfter
Application.rpcAfter()
- 类型:方法
- 参数
af所要添加的 rpcafterFilter筛选器,其签名为bf(serverId, msg, opts, next)
- 添加一个rpc
afterFilter
源码
Application.rpcAfter = function(af) {
addFilter(this, Constants.KeyWords.RPC_AFTER_FILTER, af);
};
rpcFilter
Application.rpcFilter()
- 类型:方法
- 参数
filter所要添加到 rpcbeforeFilter和afterFilter的筛选器
- 添加一个rpc filter 到
beforeFilter和afterFilter
源码
Application.rpcFilter = function(filter) {
this.rpcBefore(filter);
this.rpcAfter(filter);
};
load
Application.load()
- 类型:方法
- 参数
name- 可选,组件名component- 组件实例,或组件的工厂方法opts- 可选,提供给工厂方法的构建参数
- 加载组件
源码
Application.load = function(name, component, opts) {
if(typeof name !== 'string') {
opts = component;
component = name;
name = null;
if(typeof component.name === 'string') {
name = component.name;
}
}
if(typeof component === 'function') {
component = component(this, opts);
}
if(!name && typeof component.name === 'string') {
name = component.name;
}
if(name && this.components[name]) {
// ignore duplicat component
logger.warn('ignore duplicate component: %j', name);
return;
}
this.loaded.push(component);
if(name) {
// components with a name would get by name throught app.components later.
this.components[name] = component;
}
return this;
};
loadConfig
Application.loadConfig()
- 类型:方法
- 参数
key- 环境变量 keyval- 环境变量值
- 加载 JSON 文件并设置
源码
Application.loadConfig = function (key, val) {
var env = this.get(Constants.Reserved.ENV);
val = require(val);
if (val[env]) {
val = val[env];
}
this.set(key, val);
};
route
Application.route()
- 类型:方法
- 参数
serverType- 字符串,表示服务器类型routeFunc- 路由函数,其签名为routeFunc(session, msg, app, cb)
- 为指定类型的服务器设置路由函数
示例
app.route('area', routeFunc);
var routeFunc = function(session, msg, app, cb) {
// 所到 area的请求都会被路由到第一台 area 服务器
var areas = app.getServersByType('area');
cb(null, areas[0].id);
};
源码
Application.route = function(serverType, routeFunc) {
var routes = this.get(Constants.KeyWords.ROUTE);
if(!routes) {
routes = {};
this.set(Constants.KeyWords.ROUTE, routes);
}
routes[serverType] = routeFunc;
return this;
};
beforeStopHook
Application.beforeStopHook()
- 类型:方法
- 参数
fun- 服务器停止前所调用的函数
- 设置在服务器停止前所调用的钩子函数
源码
Application.beforeStopHook = function(fun) {
logger.warn('this method was deprecated in pomelo 0.8');
if(!!fun && typeof fun === 'function') {
this.set(Constants.KeyWords.BEFORE_STOP_HOOK, fun);
}
};
start
Application.start()
- 类型:方法
- 参数
cb- 回调函数
- 启动应用。应用启动后会加载默认组件,并启动所有已加载的组件
源码
Application.start = function(cb) {
this.startTime = Date.now();
if(this.state > STATE_INITED) {
utils.invokeCallback(cb, new Error('application has already start.'));
return;
}
appUtil.loadDefaultComponents(this);
var self = this;
var startUp = function() {
appUtil.optComponents(self.loaded, Constants.Reserved.START, function(err) {
self.state = STATE_START;
if(err) {
utils.invokeCallback(cb, err);
} else {
logger.info('%j enter after start...', self.getServerId());
self.afterStart(cb);
}
});
};
var beforeFun = this.lifecycleCbs[Constants.LIFECYCLE.BEFORE_STARTUP];
if(!!beforeFun) {
beforeFun.call(null, this, startUp);
} else {
startUp();
}
};
set
Application.set()
- 类型:方法
- 参数
setting- 应用的设置val- 所设置的值attach- 是否将设置附加到应用
- 指定
setting的值为val,或返回已设置的值
示例
app.set('key1', 'value1');
app.get('key1'); // 'value1'
app.key1; // undefined
app.set('key2', 'value2', true);
app.get('key2'); // 'value2'
app.key2; // 'value2'
源码
Application.set = function (setting, val, attach) {
if (arguments.length === 1) {
return this.settings[setting];
}
this.settings[setting] = val;
if(attach) {
this[setting] = val;
}
return this;
};
get
Application.get()
- 类型:方法
- 参数
setting- 应用的设置
- 获取已设置的属性值
源码
Application.get = function (setting) {
return this.settings[setting];
};
enabled
Application.enabled()
- 类型:方法
- 参数
setting- 应用的设置
- 检查
setting设置项是否可用
源码
Application.enabled = function (setting) {
return !!this.get(setting);
};
disabled
Application.disabled()
- 类型:方法
- 参数
setting- 应用的设置
- 检查
setting设置项是否已禁用
源码
Application.disabled = function (setting) {
return !this.get(setting);
};
enable
Application.enable()
- 类型:方法
- 参数
setting- 应用的设置
- 启用
setting设置项
源码
Application.enable = function (setting) {
return this.set(setting, true);
};
disable
Application.disable()
- 类型:方法
- 参数
setting- 应用的设置
- 禁用
setting设置项
源码
Application.disable = function (setting) {
return this.set(setting, false);
};
configure
Application.configure()
- 类型:方法
- 参数
env- 应用的环境变量fn- 回调函数type- 服务器类型
- 为指定的环境变量
env及指定类型的服务配置回调函数fn。当未指定env时,将应用到所有环境变量;当未指定type时,将应用到所有服务器。
示例
app.configure(function(){
// 将在所有环境变量及所有类型的服务器执行
});
app.configure('development', function(){
// 在 development 环境变量执行
});
app.configure('development', 'connector', function(){
// 将在 development 环境变量及 connector 服务器执行
});
源码
Application.configure = function (env, type, fn) {
var args = [].slice.call(arguments);
fn = args.pop();
env = type = Constants.Reserved.ALL;
if(args.length > 0) {
env = args[0];
}
if(args.length > 1) {
type = args[1];
}
if (env === Constants.Reserved.ALL || contains(this.settings.env, env)) {
if (type === Constants.Reserved.ALL || contains(this.settings.serverType, type)) {
fn.call(this);
}
}
return this;
};
registerAdmin
Application.registerAdmin()
- 类型:方法
- 参数
moduleId- 可选,模块id或由module.moduleId提供module-module对象或模块的工厂函数opts- 模块的构建参数
- 注册 admin 模块。Admin 模块是监控系统的扩展点
源码
Application.registerAdmin = function(moduleId, module, opts) {
var modules = this.get(Constants.KeyWords.MODULE);
if(!modules) {
modules = {};
this.set(Constants.KeyWords.MODULE, modules);
}
if(typeof moduleId !== 'string') {
opts = module;
module = moduleId;
if(module) {
moduleId = module.moduleId;
}
}
if(!moduleId){
return;
}
modules[moduleId] = {
moduleId: moduleId,
module: module,
opts: opts
};
};
use
Application.use()
- 类型:方法
- 参数
plugin- 插件实例opts- 可选,插件工厂函数的构建参数
应用插件
源码
Application.use = function(plugin, opts) {
if(!plugin.components) {
logger.error('invalid components, no components exist');
return;
}
var self = this;
var dir = path.dirname(plugin.components);
if(!fs.existsSync(plugin.components)) {
logger.error('fail to find components, find path: %s', plugin.components);
return;
}
fs.readdirSync(plugin.components).forEach(function (filename) {
if (!/\.js$/.test(filename)) {
return;
}
var name = path.basename(filename, '.js');
var param = opts[name] || {};
var absolutePath = path.join(dir, Constants.Dir.COMPONENT, filename);
if(!fs.existsSync(absolutePath)) {
logger.error('component %s not exist at %s', name, absolutePath);
} else {
self.load(require(absolutePath), param);
}
});
// load events
if(!plugin.events) {
return;
} else {
if(!fs.existsSync(plugin.events)) {
logger.error('fail to find events, find path: %s', plugin.events);
return;
}
fs.readdirSync(plugin.events).forEach(function (filename) {
if (!/\.js$/.test(filename)) {
return;
}
var absolutePath = path.join(dir, Constants.Dir.EVENT, filename);
if(!fs.existsSync(absolutePath)) {
logger.error('events %s not exist at %s', filename, absolutePath);
} else {
bindEvents(require(absolutePath), self);
}
});
}
};
transaction
Application.transaction()
- 类型:方法
- 参数
name- 事务名conditions- 在事务之前所调用的函数handlers- 处理器,即在事务期间所调用的函数retry- 重复次数,即conditions执行成功后,handlers的调用次数
应用事务。事务包括条件和处理器两部分,当条件成立时,处理器将会执行,且可以指定重复执行的次数。事务日志会保存在logs/transaction.log文件中
源码
Application.transaction = function(name, conditions, handlers, retry) {
appManager.transaction(name, conditions, handlers, retry);
};
getMaster
Application.getMaster()
- 类型:方法
获取主服务器信息
源码
Application.getMaster = function() {
return this.master;
};
getCurServer
Application.getCurServer()
- 类型:方法
获取当前服务器信息
源码
Application.getCurServer = function() {
return this.curServer;
};
getServerId
Application.getServerId()
- 类型:方法
获取当前服务器 id
源码
Application.getServerId = function() {
return this.serverId;
};
getServerType
Application.getServerType()
- 类型:方法
获取当前服务器的类型
源码
Application.getServerType = function() {
return this.serverType;
};
getServers
Application.getServers()
- 类型:方法
获取当前所有服务器信息
源码
Application.getServers = function() {
return this.servers;
};
getServersFromConfig
Application.getServersFromConfig()
- 类型:方法
从servers.json文件获取所有服务器信息
源码
Application.getServersFromConfig = function() {
return this.get(Constants.KeyWords.SERVER_MAP);
};
getServerTypes
Application.getServerTypes()
- 类型:方法
获取所有服务器类型
源码
Application.getServerTypes = function() {
return this.serverTypes;
};
getServerById
Application.getServerById()
- 类型:方法
从当前服务器集中获取指定服务器 id 的服务器信息
源码
Application.getServerById = function(serverId) {
return this.servers[serverId];
};
getServerFromConfig
Application.getServerFromConfig()
- 类型:方法
-
参数
serverId- 服务器ID
从servers.json文件获取指定服务器的信息
源码
Application.getServerFromConfig = function(serverId) {
return this.get(Constants.keyWords.SERVER_MAP)[serverId];
};
getServersByType
Application.getServersByType()
- 类型:方法
-
参数
serverType- 服务器类型
获取指定类型的服务器信息
源码
Application.getServersByType = function(serverType) {
return this.serverTypeMaps[serverType];
};
isFrontend
Application.isFrontend()
- 类型:方法
-
参数
server- 要检查的服务器
检查服务器是否是前端服务器
源码
Application.isFrontend = function(server) {
server = server || this.getCurServer();
return !!server && server.frontend === 'true';
};
isBackend
Application.isBackend()
- 类型:方法
-
参数
server- 要检查的服务器
检查服务器是否是后端服务器
源码
Application.isBackend = function(server) {
server = server || this.getCurServer();
return !!server && !server.frontend;
};
isMaster
Application.isMaster()
- 类型:方法
检查当前服务器是否是主服务器
源码
Application.isMaster = function() {
return this.serverType === Constants.Reserved.MASTER;
};
addServers
Application.addServers()
- 类型:方法
-
参数
servers- 要添加的服务器列表
将新服务器添加到当前运行的应用中
源码
Application.addServers = function(servers) {
if(!servers || !servers.length) {
return;
}
var item, slist;
for(var i=0, l=servers.length; i < l; i++) {
item = servers[i];
// update global server map
this.servers[item.id] = item;
// update global server type map
slist = this.serverTypeMaps[item.serverType];
if(!slist) {
this.serverTypeMaps[item.serverType] = slist = [];
}
replaceServer(slist, item);
// update global server type list
if(this.serverTypes.indexOf(item.serverType) < 0) {
this.serverTypes.push(item.serverType);
}
}
this.event.emit(events.ADD_SERVERS, servers);
};
removeServers
Application.removeServers()
- 类型:方法
-
参数
ids- 服务器id列表
将服务器从当前运行的应用中删除
源码
Application.removeServers = function(ids) {
if(!ids || !ids.length) {
return;
}
var id, item, slist;
for(var i=0, l=ids.length; i < l; i++) {
id = ids[i];
item = this.servers[id];
if(!item) {
continue;
}
// clean global server map
delete this.servers[id];
// clean global server type map
slist = this.serverTypeMaps[item.serverType];
removeServer(slist, id);
// TODO: should remove the server type if the slist is empty?
}
this.event.emit(events.REMOVE_SERVERS, ids);
};
replaceServers
Application.replaceServers()
- 类型:方法
-
参数
server- id map
替换当前运行应用中的服务器
源码
Application.replaceServers = function(servers) {
if(!servers){
return;
}
this.servers = servers;
this.serverTypeMaps = {};
this.serverTypes = [];
var serverArray = [];
for(var id in servers){
var server = servers[id];
var serverType = server[Constants.Reserved.SERVER_TYPE];
var slist = this.serverTypeMaps[serverType];
if(!slist) {
this.serverTypeMaps[serverType] = slist = [];
}
this.serverTypeMaps[serverType].push(server);
// update global server type list
if(this.serverTypes.indexOf(serverType) < 0) {
this.serverTypes.push(serverType);
}
serverArray.push(server);
}
this.event.emit(events.REPLACE_SERVERS, serverArray);
};
addCrons
Application.addCrons()
- 类型:方法
-
参数
crons- 要添加到应用的新作业
将作业(定时任务)添加到当前运行的应用中
源码
Application.addCrons = function(crons) {
if(!crons || !crons.length) {
logger.warn('crons is not defined.');
return;
}
this.event.emit(events.ADD_CRONS, crons);
};
removeCrons
Application.removeCrons()
- 类型:方法
-
参数
crons- 要应用中移除的旧作业
从当前运行的应用中移除作业(定时任务)
源码
Application.removeCrons = function(crons) {
if(!crons || !crons.length) {
logger.warn('ids is not defined.');
return;
}
this.event.emit(events.REMOVE_CRONS, crons);
};
var replaceServer = function(slist, serverInfo) {
for(var i=0, l=slist.length; i < l; i++) {
if(slist[i].id === serverInfo.id) {
slist[i] = serverInfo;
return;
}
}
slist.push(serverInfo);
};
var removeServer = function(slist, id) {
if(!slist || !slist.length) {
return;
}
for(var i=0, l=slist.length; i < l; i++) {
if(slist[i].id === id) {
slist.splice(i, 1);
return;
}
}
};
var contains = function(str, settings) {
if(!settings) {
return false;
}
var ts = settings.split("|");
for(var i=0, l=ts.length; i < l; i++) {
if(str === ts[i]) {
return true;
}
}
return false;
};
var bindEvents = function(Event, app) {
var emethods = new Event(app);
for(var m in emethods) {
if(typeof emethods[m] === 'function') {
app.event.on(m, emethods[m].bind(emethods));
}
}
};
var addFilter = function(app, type, filter) {
var filters = app.get(type);
if(!filters) {
filters = [];
app.set(type, filters);
}
filters.push(filter);
};
3. BackendSessionService
该服务用于维持后端 session ,及与前端服务器的通信。
BackendSessionService会在每个服务器进程中创建,并维持当前进程的后端 session,并和与之相对应的前端服务器进行通信。
BackendSessionService实例可以通过app.get('backendSessionService')或app.backendSessionService来访问。
get
BackendSessionService.prototype.get()
- 类型:方法
-
参数
frontendId- 添加到当前 session 的前端服务器 idsid- session idcb- 回调函数,其签名为cb(err, BackendSession)
通过前端服务器服务器id 及 session id 获取后端 session
源码
BackendSessionService.prototype.get = function(frontendId, sid, cb) {
var namespace = 'sys';
var service = 'sessionRemote';
var method = 'getBackendSessionBySid';
var args = [sid];
rpcInvoke(this.app, frontendId, namespace, service, method,
args, BackendSessionCB.bind(null, this, cb));
};
getByUid
BackendSessionService.prototype.getByUid()
- 类型:方法
-
参数
frontendId- 添加到当前 session 的前端服务器 iduid- 与 session 绑定的用户idcb- 回调函数,其签名为cb(err, BackendSession)
通过前端服务器服务器id 及 user id 获取后端 session
源码
BackendSessionService.prototype.getByUid = function(frontendId, uid, cb) {
var namespace = 'sys';
var service = 'sessionRemote';
var method = 'getBackendSessionsByUid';
var args = [uid];
rpcInvoke(this.app, frontendId, namespace, service, method,
args, BackendSessionCB.bind(null, this, cb));
};
kickBySid
BackendSessionService.prototype.kickBySid()
- 类型:方法
-
参数
frontendId- 前端服务器 idsid- session idcb- 回调函数
将指定 id 的 session 从前端服务器踢出
源码
BackendSessionService.prototype.kickBySid = function(frontendId, sid, cb) {
var namespace = 'sys';
var service = 'sessionRemote';
var method = 'kickBySid';
var args = [sid];
rpcInvoke(this.app, frontendId, namespace, service, method, args, cb);
};
kickBySid
BackendSessionService.prototype.kickByUid()
- 类型:方法
-
参数
frontendId- 前端服务器 iduid- 用户 idcb- 回调函数
将指定用户 id 的 session 从前端服务器踢出
源码
BackendSessionService.prototype.kickByUid = function(frontendId, uid, cb) {
var namespace = 'sys';
var service = 'sessionRemote';
var method = 'kickByUid';
var args = [uid];
rpcInvoke(this.app, frontendId, namespace, service, method, args, cb);
};
4. BackendSession
BackendSession是对前端内部 session 的一个代理,它是在本地服务器保存的一个键/值对对象。其由BackendSessionService创建和维护,前端服务器本地 session不能直接访问,但我们可以通过BackendSession来对其访问和修改。
对于后端 session 的主要操作仅应该是读取,对后端 session 的任何修改都是本地的,会在下次请求中被丢弃。如果需要修改,就应该显式的将其推送(push或pushAll)到前端。推送后,会将相同 key 的值进行覆盖,并会在下次请求中看到修改后的值。如果要在不同进程中并发地推送 session,则必须确保外部事务。
bind
backendSession.prototype.bind()
- 类型:方法
-
参数
uid- 用户 idcb- 回调函数
将当前 session 与指定的 user id 绑定。这会推送 uid 到前端服务器,并绑定 uid 到前端内部 session。
源码
BackendSession.prototype.bind = function(uid, cb) {
var self = this;
this.__sessionService__.bind(this.frontendId, this.id, uid, function(err) {
if(!err) {
self.uid = uid;
}
utils.invokeCallback(cb, err);
});
};
unbind
backendSession.prototype.unbind()
- 类型:方法
-
参数
uid- 用户 idcb- 回调函数
将当前 session 与 user id 解绑。这会推送 uid 到前端服务器,并将 uid 与前端内部 session 解绑。
源码
BackendSession.prototype.unbind = function(uid, cb) {
var self = this;
this.__sessionService__.unbind(this.frontendId, this.id, uid, function(err) {
if(!err) {
self.uid = null;
}
utils.invokeCallback(cb, err);
});
};
set
backendSession.prototype.set()
- 类型:方法
-
参数
key- keyvalue- value
设置 key/value 到后端 session。
源码
BackendSession.prototype.set = function(key, value) {
this.settings[key] = value;
};
get
backendSession.prototype.get()
- 类型:方法
-
参数
key- key
获取后端 session 中,指定 key 的值。
源码
BackendSession.prototype.get = function(key) {
return this.settings[key];
};
push
backendSession.prototype.push()
- 类型:方法
-
参数
key- keycb- 回调函数
推送后端 session 中指定 key/value 到前端服务器内部 session。
源码
BackendSession.prototype.push = function(key, cb) {
this.__sessionService__.push(this.frontendId, this.id, key, this.get(key), cb);
};
pushAll
backendSession.prototype.pushAll()
- 类型:方法
-
参数
cb- 回调函数
推送后端 session 中所有 key/value 到前端服务器内部 session。
源码
BackendSession.prototype.pushAll = function(cb) {
this.__sessionService__.pushAll(this.frontendId, this.id, this.settings, cb);
};
5. SessionService
Session 服务即前端服务器Session服务,它为每个已连接的客户端维持一个内部 session。
SessionService由 session 组件创建,并且其仅对前端服务器有效。在前端服务器中,可以通过app.get('sessionService')或app.sessionService访问。
kick
channelService.prototype.kick()
- 类型:方法
-
参数
uid- 与 session 关联的用户 idcb- 回调函数
将指定用户的所有 session 踢下线。
源码
SessionService.prototype.kick = function(uid, reason, cb) {
// compatible for old kick(uid, cb);
if(typeof reason === 'function') {
cb = reason;
reason = 'kick';
}
var sessions = this.getByUid(uid);
if(sessions) {
// notify client
for(var i=0, l=sessions.length; i < l; i++) {
sessions[i].closed(reason);
}
process.nextTick(function() {
utils.invokeCallback(cb);
});
} else {
process.nextTick(function() {
utils.invokeCallback(cb);
});
}
};
kickBySessionId
channelService.prototype.kickBySessionId()
- 类型:方法
-
参数
sid- session idcb- 回调函数
通过 session id将用户踢下线。
源码
SessionService.prototype.kickBySessionId = function(sid, cb) {
var session = this.get(sid);
if(session) {
// notify client
session.closed('kick');
process.nextTick(function() {
utils.invokeCallback(cb);
});
} else {
process.nextTick(function() {
utils.invokeCallback(cb);
});
}
};
5. ChannelService
在本地服务器创建并维持频道(channel)
ChannelService由channel组件创建,该组件是 pomelo 默认加载的组件,其可以通过app.get('channelService')访问。
createChannel
channelService.prototype.createChannel()
- 类型:方法
-
参数
name- channel 名
创建指定名称的 channel。
源码
ChannelService.prototype.createChannel = function(name) {
if(this.channels[name]) {
return this.channels[name];
}
var c = new Channel(name, this);
this.channels[name] = c;
return c;
};
getChannel
channelService.prototype.getChannel()
- 类型:方法
-
参数
name- channel 名create- 设置为true时,将会创建 channel
获取指定名称的 channel。
源码
ChannelService.prototype.getChannel = function(name, create) {
var channel = this.channels[name];
if(!channel && !!create) {
channel = this.channels[name] = new Channel(name, this);
}
return channel;
};
destroyChannel
channelService.prototype.destroyChannel()
- 类型:方法
-
参数
name- channel 名
销毁指定名称的 channel。
源码
ChannelService.prototype.destroyChannel = function(name) {
delete this.channels[name];
};
pushMessageByUids
channelService.prototype.pushMessageByUids()
- 类型:方法
-
参数
route- 消息路由msg- 消息uids- 接收消息的用户信息列表,其格式为:[{uid: userId, sid: frontendServerId}]opts- 可选,用户定义的推送选项cb- 回调函数,其签名为cb(err)
向指定的uids推送消息。
源码
ChannelService.prototype.pushMessageByUids = function(route, msg, uids, opts, cb) {
if(typeof route !== 'string') {
cb = opts;
opts = uids;
uids = msg;
msg = route;
route = msg.route;
}
if(!cb && typeof opts === 'function') {
cb = opts;
opts = {};
}
if(!uids || uids.length === 0) {
utils.invokeCallback(cb, new Error('uids should not be empty'));
return;
}
var groups = {}, record;
for(var i=0, l=uids.length; i < l; i++) {
record = uids[i];
add(record.uid, record.sid, groups);
}
sendMessageByGroup(this, route, msg, groups, opts, cb);
};
broadcast
channelService.prototype.broadcast()
- 类型:方法
-
参数
stype- 字符串,前端服务器类型route- 消息路由msg- 消息opts- 可选,用户定义的推送选项cb- 回调函数,其签名为cb(err)
向所有已连接的推送消息。
源码
ChannelService.prototype.broadcast = function(stype, route, msg, opts, cb) {
var app = this.app;
var namespace = 'sys';
var service = 'channelRemote';
var method = 'broadcast';
var servers = app.getServersByType(stype);
if(!servers || servers.length === 0) {
// server list is empty
utils.invokeCallback(cb);
return;
}
var count = servers.length;
var successFlag = false;
var latch = countDownLatch.createCountDownLatch(count, function() {
if(!successFlag) {
utils.invokeCallback(cb, new Error('broadcast fails'));
return;
}
utils.invokeCallback(cb, null);
});
var genCB = function() {
return function(err) {
if(err) {
logger.error('[broadcast] fail to push message, err:' + err.stack);
latch.done();
return;
}
successFlag = true;
latch.done();
};
};
opts = {type: 'broadcast', userOptions: opts || {}};
// for compatiblity
opts.isBroadcast = true;
if(opts.userOptions) {
opts.binded = opts.userOptions.binded;
opts.filterParam = opts.userOptions.filterParam;
}
for(var i=0, l=count; i < l; i++) {
app.rpcInvoke(servers[i].id, {namespace: namespace, service: service,
method: method, args: [route, msg, opts]}, genCB());
}
};
7. Channel
Channel维持已订阅的接收者集合。你将可以添加用户到一个channel中,并通过 channel 广播消息。
add
channelService.prototype.add()
- 类型:方法
-
参数
uid- 要添加的用户的 idsid- 用户连接的前端服务器的id
添加用户到 channel 中
源码
Channel.prototype.add = function(uid, sid) {
if(this.state > ST_INITED) {
return false;
} else {
var res = add(uid, sid, this.groups);
if(res) {
this.records[uid] = {sid: sid, uid: uid};
}
return res;
}
};
leave
channelService.prototype.leave()
- 类型:方法
-
参数
uid- 用户的 idsid- 用户连接的前端服务器的id
将用户从 channel 中移除
源码
Channel.prototype.leave = function(uid, sid) {
if(!uid || !sid) {
return false;
}
delete this.records[uid];
var res = deleteFrom(uid, sid, this.groups[sid]);
if(this.groups[sid] && this.groups[sid].length === 0) {
delete this.groups[sid];
}
return res;
};
getMembers
channelService.prototype.getMembers()
- 类型:方法
获取 channel 中的所有成员(用户)
注意:本操作较重,应少用或不用
源码
Channel.prototype.getMembers = function() {
var res = [], groups = this.groups;
var group, i, l;
for(var sid in groups) {
group = groups[sid];
for(i=0, l=group.length; i < l; i++) {
res.push(group[i]);
}
}
return res;
};
getMember
channelService.prototype.getMember()
- 类型:方法
- 参数
uid- 用户的 id
获取 channel 中指定用户的信息
源码
Channel.prototype.getMember = function(uid) {
return this.records[uid];
};
destroy
channelService.prototype.destroy()
- 类型:方法
销毁指定的 channel
源码
Channel.prototype.destroy = function() {
this.state = ST_DESTROYED;
this.__channelService__.destroyChannel(this.name);
};
pushMessage
channelService.prototype.pushMessage()
- 类型:方法
-
参数
route- 消息路由msg- 消息opts- 可选,用户定义的推送选项cb- 回调函数,其签名为cb(err)
将消息推送到 channel 中的所有成员
源码
Channel.prototype.pushMessage = function(route, msg, opts, cb) {
if(this.state !== ST_INITED) {
utils.invokeCallback(new Error('channel is not running now'));
return;
}
if(typeof route !== 'string') {
cb = opts;
opts = msg;
msg = route;
route = msg.route;
}
if(!cb && typeof opts === 'function') {
cb = opts;
opts = {};
}
sendMessageByGroup(this.__channelService__, route, msg, this.groups, opts, cb);
};
