Pomelo 服务端对象及API



  1. Pomelo
  2. Application
  3. BackendSessionService
  4. BackendSession
  5. SessionService
  6. ChannelService
  7. Channel

1. Pomelo

导出createApplication()

Pomelo是引用'pomelo'模块后,所获取的一个顶级对象。

createApp

Pomelo.createApp()

  • 类型:方法

创建 pomelo 应用

通过Pomelo.createApp()所创建的应用,将会做为一个只读属性挂到Pomelo上,并可通过Pomelo.app来访问所创建的应用。如下所示:

const pomelo = require('pomelo');

// 访问应用
pomelo.app;

源码

Pomelo.createApp = function (opts) {
  var app = application;
  app.init(opts);
  self.app = app;
  return app;
};


2. Application

Application 原型

getBase

Application.getBase()

  • 类型:方法

获取应用的基础路径

Pomelo 应用启动后,可通过以下方式获取路径:

app.getBase();    // /home/game 

源码

Application.getBase = function() {
  return this.get(Constants.Reserved.BASE);
};


filter

Application.filter()

  • 类型:方法
  • 参数
    • filter所要添加的筛选器

将一个filter添加到beforeFilterafterFilter

源码

Application.filter = function (filter) {
  this.before(filter);
  this.after(filter);
};


before

Application.before()

  • 类型:方法
  • 参数
    • bf所要添加的beforeFilter筛选器,其签名为bf(msg, session, next)

添加一个beforeFilter

源码

Application.before = function (bf) {
  addFilter(this, Constants.KeyWords.BEFORE_FILTER, bf);
};


after

Application.after()

  • 类型:方法
  • 参数
    • af所要添加的afterFilter筛选器,其签名为af(err, msg, session, resp, nex)

添加一个afterFilter

源码

Application.after = function (af) {
  addFilter(this, Constants.KeyWords.AFTER_FILTER, af);
};


globalFilter

Application.globalFilter()

  • 类型:方法
  • 参数
    • filter所要添加的筛选器

添加一个全局筛选器到全局beforeFilter和全局afterFilter

源码

Application.globalFilter = function (filter) {
  this.globalBefore(filter);
  this.globalAfter(filter);
};


globalBefore

Application.globalBefore()

  • 类型:方法
  • 参数
    • bf所要添加的beforeFilter筛选器,其签名为bf(msg, session, next)

添加一个全局beforeFilter

源码

Application.globalBefore = function (bf) {
  addFilter(this, Constants.KeyWords.GLOBAL_BEFORE_FILTER, bf);
};


globalAfter

Application.globalAfter()

  • 类型:方法
  • 参数
    • af所要添加的afterFilter筛选器,其签名为af(err, msg, session, resp, nex)

添加一个全局afterFilter

源码

Application.globalAfter = function (af) {
  addFilter(this, Constants.KeyWords.GLOBAL_AFTER_FILTER, af);
};


rpcBefore

Application.rpcBefore()

  • 类型:方法
  • 参数
    • bf所要添加的 rpc beforeFilter筛选器,其签名为bf(serverId, msg, opts, next)

添加一个rpc beforeFilter

源码

Application.rpcBefore = function(bf) {
  addFilter(this, Constants.KeyWords.RPC_BEFORE_FILTER, bf);
};


rpcAfter

Application.rpcAfter()

  • 类型:方法
  • 参数
    • af所要添加的 rpc afterFilter筛选器,其签名为bf(serverId, msg, opts, next)
  • 添加一个rpc afterFilter

源码

Application.rpcAfter = function(af) {
  addFilter(this, Constants.KeyWords.RPC_AFTER_FILTER, af);
};


rpcFilter

Application.rpcFilter()

  • 类型:方法
  • 参数
    • filter所要添加到 rpc beforeFilterafterFilter的筛选器
  • 添加一个rpc filter 到beforeFilterafterFilter

源码

Application.rpcFilter = function(filter) {
  this.rpcBefore(filter);
  this.rpcAfter(filter);
};


load

Application.load()

  • 类型:方法
  • 参数
    • name - 可选,组件名
    • component - 组件实例,或组件的工厂方法
    • opts - 可选,提供给工厂方法的构建参数
  • 加载组件

源码

Application.load = function(name, component, opts) {
  if(typeof name !== 'string') {
    opts = component;
    component = name;
    name = null;
    if(typeof component.name === 'string') {
      name = component.name;
    }
  }

  if(typeof component === 'function') {
    component = component(this, opts);
  }

  if(!name && typeof component.name === 'string') {
    name = component.name;
  }

  if(name && this.components[name]) {
    // ignore duplicat component
    logger.warn('ignore duplicate component: %j', name);
    return;
  }

  this.loaded.push(component);
  if(name) {
    // components with a name would get by name throught app.components later.
    this.components[name] = component;
  }

  return this;
};


loadConfig

Application.loadConfig()

  • 类型:方法
  • 参数
    • key - 环境变量 key
    • val - 环境变量值
  • 加载 JSON 文件并设置

源码

Application.loadConfig = function (key, val) {
  var env = this.get(Constants.Reserved.ENV);
  val = require(val);
  if (val[env]) {
    val = val[env];
  }
  this.set(key, val);
};


route

Application.route()

  • 类型:方法
  • 参数
    • serverType - 字符串,表示服务器类型
    • routeFunc - 路由函数,其签名为routeFunc(session, msg, app, cb)
  • 为指定类型的服务器设置路由函数

示例

app.route('area', routeFunc);
var routeFunc = function(session, msg, app, cb) { 
  // 所到 area的请求都会被路由到第一台 area 服务器
  var areas = app.getServersByType('area'); 
  cb(null, areas[0].id); 
};

源码

Application.route = function(serverType, routeFunc) {
  var routes = this.get(Constants.KeyWords.ROUTE);
  if(!routes) {
    routes = {};
    this.set(Constants.KeyWords.ROUTE, routes);
  }
  routes[serverType] = routeFunc;
  return this;
};


beforeStopHook

Application.beforeStopHook()

  • 类型:方法
  • 参数
    • fun - 服务器停止前所调用的函数
  • 设置在服务器停止前所调用的钩子函数

源码

Application.beforeStopHook = function(fun) {
  logger.warn('this method was deprecated in pomelo 0.8');
  if(!!fun && typeof fun === 'function') {
    this.set(Constants.KeyWords.BEFORE_STOP_HOOK, fun);
  }
};


start

Application.start()

  • 类型:方法
  • 参数
    • cb - 回调函数
  • 启动应用。应用启动后会加载默认组件,并启动所有已加载的组件

源码

Application.start = function(cb) {
  this.startTime = Date.now();
  if(this.state > STATE_INITED) {
    utils.invokeCallback(cb, new Error('application has already start.'));
    return;
  }
  appUtil.loadDefaultComponents(this);
  var self = this;
  var startUp = function() {
    appUtil.optComponents(self.loaded, Constants.Reserved.START, function(err) {
      self.state = STATE_START;
      if(err) {
        utils.invokeCallback(cb, err);
      } else {
        logger.info('%j enter after start...', self.getServerId());
        self.afterStart(cb);
      }
    });
  };
  var beforeFun = this.lifecycleCbs[Constants.LIFECYCLE.BEFORE_STARTUP];
  if(!!beforeFun) {
    beforeFun.call(null, this, startUp);
  } else {
    startUp();
  }
};


set

Application.set()

  • 类型:方法
  • 参数
    • setting - 应用的设置
    • val - 所设置的值
    • attach - 是否将设置附加到应用
  • 指定setting的值为val,或返回已设置的值

示例

app.set('key1', 'value1'); 
app.get('key1'); // 'value1' 
app.key1; // undefined
app.set('key2', 'value2', true); 
app.get('key2'); // 'value2' 
app.key2; // 'value2'

源码

Application.set = function (setting, val, attach) {
  if (arguments.length === 1) {
    return this.settings[setting];
  }
  this.settings[setting] = val;
  if(attach) {
    this[setting] = val;
  }
  return this;
};


get

Application.get()

  • 类型:方法
  • 参数
    • setting - 应用的设置
  • 获取已设置的属性值

源码

Application.get = function (setting) {
  return this.settings[setting];
};


enabled

Application.enabled()

  • 类型:方法
  • 参数
    • setting - 应用的设置
  • 检查setting设置项是否可用

源码

Application.enabled = function (setting) {
  return !!this.get(setting);
};


disabled

Application.disabled()

  • 类型:方法
  • 参数
    • setting - 应用的设置
  • 检查setting设置项是否已禁用

源码

Application.disabled = function (setting) {
  return !this.get(setting);
};


enable

Application.enable()

  • 类型:方法
  • 参数
    • setting - 应用的设置
  • 启用setting设置项

源码

Application.enable = function (setting) {
  return this.set(setting, true);
};


disable

Application.disable()

  • 类型:方法
  • 参数
    • setting - 应用的设置
  • 禁用setting设置项

源码

Application.disable = function (setting) {
  return this.set(setting, false);
};


configure

Application.configure()

  • 类型:方法
  • 参数
    • env - 应用的环境变量
    • fn - 回调函数
    • type - 服务器类型
  • 为指定的环境变量env及指定类型的服务配置回调函数fn。当未指定env时,将应用到所有环境变量;当未指定type时,将应用到所有服务器。

示例

app.configure(function(){ 
  // 将在所有环境变量及所有类型的服务器执行
 });
app.configure('development', function(){ 
  // 在 development 环境变量执行 
});
app.configure('development', 'connector', function(){ 
  // 将在 development 环境变量及 connector 服务器执行
});

源码

Application.configure = function (env, type, fn) {
  var args = [].slice.call(arguments);
  fn = args.pop();
  env = type = Constants.Reserved.ALL;

  if(args.length > 0) {
    env = args[0];
  }
  if(args.length > 1) {
    type = args[1];
  }

  if (env === Constants.Reserved.ALL || contains(this.settings.env, env)) {
    if (type === Constants.Reserved.ALL || contains(this.settings.serverType, type)) {
      fn.call(this);
    }
  }
  return this;
};


registerAdmin

Application.registerAdmin()

  • 类型:方法
  • 参数
    • moduleId - 可选,模块id或由module.moduleId提供
    • module - module对象或模块的工厂函数
    • opts - 模块的构建参数
  • 注册 admin 模块。Admin 模块是监控系统的扩展点

源码

Application.registerAdmin = function(moduleId, module, opts) {
  var modules = this.get(Constants.KeyWords.MODULE);
  if(!modules) {
    modules = {};
    this.set(Constants.KeyWords.MODULE, modules);
  }

  if(typeof moduleId !== 'string') {
    opts = module;
    module = moduleId;
    if(module) {
      moduleId = module.moduleId;
    }
  }

  if(!moduleId){
    return;
  }

  modules[moduleId] = {
    moduleId: moduleId,
    module: module,
    opts: opts
  };
};


use

Application.use()

  • 类型:方法
  • 参数
    • plugin - 插件实例
    • opts - 可选,插件工厂函数的构建参数

应用插件

源码

Application.use = function(plugin, opts) {
  if(!plugin.components) {
    logger.error('invalid components, no components exist');
    return;
  }
  
  var self = this;
  var dir = path.dirname(plugin.components);

  if(!fs.existsSync(plugin.components)) {
    logger.error('fail to find components, find path: %s', plugin.components);
    return;
  }

  fs.readdirSync(plugin.components).forEach(function (filename) {
    if (!/\.js$/.test(filename)) {
      return;
    }
    var name = path.basename(filename, '.js');
    var param = opts[name] || {};
    var absolutePath = path.join(dir, Constants.Dir.COMPONENT, filename);
    if(!fs.existsSync(absolutePath)) {
      logger.error('component %s not exist at %s', name, absolutePath);
    } else {
      self.load(require(absolutePath), param);
    }
  });

  // load events
  if(!plugin.events) {
    return;
  } else {
    if(!fs.existsSync(plugin.events)) {
      logger.error('fail to find events, find path: %s', plugin.events);
      return;
    }

    fs.readdirSync(plugin.events).forEach(function (filename) {
      if (!/\.js$/.test(filename)) {
        return;
      }
      var absolutePath = path.join(dir, Constants.Dir.EVENT, filename);
      if(!fs.existsSync(absolutePath)) {
        logger.error('events %s not exist at %s', filename, absolutePath);
      } else {
        bindEvents(require(absolutePath), self);
      }
    });
  }
};


transaction

Application.transaction()

  • 类型:方法
  • 参数
    • name - 事务名
    • conditions - 在事务之前所调用的函数
    • handlers - 处理器,即在事务期间所调用的函数
    • retry - 重复次数,即conditions执行成功后,handlers的调用次数

应用事务。事务包括条件和处理器两部分,当条件成立时,处理器将会执行,且可以指定重复执行的次数。事务日志会保存在logs/transaction.log文件中

源码

Application.transaction = function(name, conditions, handlers, retry) {
  appManager.transaction(name, conditions, handlers, retry);
};


getMaster

Application.getMaster()

  • 类型:方法

获取主服务器信息

源码

Application.getMaster = function() {
  return this.master;
};


getCurServer

Application.getCurServer()

  • 类型:方法

获取当前服务器信息

源码

Application.getCurServer = function() {
  return this.curServer;
};


getServerId

Application.getServerId()

  • 类型:方法

获取当前服务器 id

源码

Application.getServerId = function() {
  return this.serverId;
};


getServerType

Application.getServerType()

  • 类型:方法

获取当前服务器的类型

源码

Application.getServerType = function() {
  return this.serverType;
};


getServers

Application.getServers()

  • 类型:方法

获取当前所有服务器信息

源码

Application.getServers = function() {
  return this.servers;
};


getServersFromConfig

Application.getServersFromConfig()

  • 类型:方法

servers.json文件获取所有服务器信息

源码

Application.getServersFromConfig = function() {
  return this.get(Constants.KeyWords.SERVER_MAP);
};


getServerTypes

Application.getServerTypes()

  • 类型:方法

获取所有服务器类型

源码

Application.getServerTypes = function() {
  return this.serverTypes;
};


getServerById

Application.getServerById()

  • 类型:方法

从当前服务器集中获取指定服务器 id 的服务器信息

源码

Application.getServerById = function(serverId) {
  return this.servers[serverId];
};


getServerFromConfig

Application.getServerFromConfig()

  • 类型:方法
  • 参数
    • serverId - 服务器ID

servers.json文件获取指定服务器的信息

源码

Application.getServerFromConfig = function(serverId) {
  return this.get(Constants.keyWords.SERVER_MAP)[serverId];
};


getServersByType

Application.getServersByType()

  • 类型:方法
  • 参数
    • serverType - 服务器类型

获取指定类型的服务器信息

源码

Application.getServersByType = function(serverType) {
  return this.serverTypeMaps[serverType];
};


isFrontend

Application.isFrontend()

  • 类型:方法
  • 参数
    • server - 要检查的服务器

检查服务器是否是前端服务器

源码

Application.isFrontend = function(server) {
  server = server || this.getCurServer();
  return !!server && server.frontend === 'true';
};


isBackend

Application.isBackend()

  • 类型:方法
  • 参数
    • server - 要检查的服务器

检查服务器是否是后端服务器

源码

Application.isBackend = function(server) {
  server = server || this.getCurServer();
  return !!server && !server.frontend;
};


isMaster

Application.isMaster()

  • 类型:方法

检查当前服务器是否是主服务器

源码

Application.isMaster = function() {
  return this.serverType === Constants.Reserved.MASTER;
};


addServers

Application.addServers()

  • 类型:方法
  • 参数
    • servers - 要添加的服务器列表

将新服务器添加到当前运行的应用中

源码

Application.addServers = function(servers) {
  if(!servers || !servers.length) {
    return;
  }

  var item, slist;
  for(var i=0, l=servers.length; i < l; i++) {
    item = servers[i];
    // update global server map
    this.servers[item.id] = item;

    // update global server type map
    slist = this.serverTypeMaps[item.serverType];
    if(!slist) {
      this.serverTypeMaps[item.serverType] = slist = [];
    }
    replaceServer(slist, item);

    // update global server type list
    if(this.serverTypes.indexOf(item.serverType) < 0) {
      this.serverTypes.push(item.serverType);
    }
  }
  this.event.emit(events.ADD_SERVERS, servers);
};


removeServers

Application.removeServers()

  • 类型:方法
  • 参数
    • ids - 服务器id列表

将服务器从当前运行的应用中删除

源码

Application.removeServers = function(ids) {
  if(!ids || !ids.length) {
    return;
  }

  var id, item, slist;
  for(var i=0, l=ids.length; i < l; i++) {
    id = ids[i];
    item = this.servers[id];
    if(!item) {
      continue;
    }
    // clean global server map
    delete this.servers[id];

    // clean global server type map
    slist = this.serverTypeMaps[item.serverType];
    removeServer(slist, id);
    // TODO: should remove the server type if the slist is empty?
  }
  this.event.emit(events.REMOVE_SERVERS, ids);
};


replaceServers

Application.replaceServers()

  • 类型:方法
  • 参数
    • server - id map

替换当前运行应用中的服务器

源码

Application.replaceServers = function(servers) {
  if(!servers){
    return;
  }

  this.servers = servers;
  this.serverTypeMaps = {};
  this.serverTypes = [];
  var serverArray = [];
  for(var id in servers){
    var server = servers[id];
    var serverType = server[Constants.Reserved.SERVER_TYPE];
    var slist = this.serverTypeMaps[serverType];
    if(!slist) {
      this.serverTypeMaps[serverType] = slist = [];
    }
    this.serverTypeMaps[serverType].push(server);
    // update global server type list
    if(this.serverTypes.indexOf(serverType) < 0) {
      this.serverTypes.push(serverType);
    }
    serverArray.push(server);
  }
  this.event.emit(events.REPLACE_SERVERS, serverArray);
};


addCrons

Application.addCrons()

  • 类型:方法
  • 参数
    • crons - 要添加到应用的新作业

将作业(定时任务)添加到当前运行的应用中

源码

Application.addCrons = function(crons) {
  if(!crons || !crons.length) {
    logger.warn('crons is not defined.');
    return;
  }
  this.event.emit(events.ADD_CRONS, crons);
};


removeCrons

Application.removeCrons()

  • 类型:方法
  • 参数
    • crons - 要应用中移除的旧作业

从当前运行的应用中移除作业(定时任务)

源码

Application.removeCrons = function(crons) {
  if(!crons || !crons.length) {
    logger.warn('ids is not defined.');
    return;
  }
  this.event.emit(events.REMOVE_CRONS, crons);
};

var replaceServer = function(slist, serverInfo) {
  for(var i=0, l=slist.length; i < l; i++) {
    if(slist[i].id === serverInfo.id) {
      slist[i] = serverInfo;
      return;
    }
  }
  slist.push(serverInfo);
};

var removeServer = function(slist, id) {
  if(!slist || !slist.length) {
    return;
  }

  for(var i=0, l=slist.length; i < l; i++) {
    if(slist[i].id === id) {
      slist.splice(i, 1);
      return;
    }
  }
};

var contains = function(str, settings) {
  if(!settings) {
    return false;
  }

  var ts = settings.split("|");
  for(var i=0, l=ts.length; i < l; i++) {
    if(str === ts[i]) {
      return true;
    }
  }
  return false;
};

var bindEvents = function(Event, app) {
  var emethods = new Event(app);
  for(var m in emethods) {
    if(typeof emethods[m] === 'function') {
      app.event.on(m, emethods[m].bind(emethods));
    }
  }
};

var addFilter = function(app, type, filter) {
 var filters = app.get(type);
  if(!filters) {
    filters = [];
    app.set(type, filters);
  }
  filters.push(filter);
};


3. BackendSessionService

该服务用于维持后端 session ,及与前端服务器的通信。

BackendSessionService会在每个服务器进程中创建,并维持当前进程的后端 session,并和与之相对应的前端服务器进行通信。

BackendSessionService实例可以通过app.get('backendSessionService')app.backendSessionService来访问。


get

BackendSessionService.prototype.get()

  • 类型:方法
  • 参数
    • frontendId - 添加到当前 session 的前端服务器 id
    • sid - session id
    • cb - 回调函数,其签名为cb(err, BackendSession)

通过前端服务器服务器id 及 session id 获取后端 session

源码

BackendSessionService.prototype.get = function(frontendId, sid, cb) {
  var namespace = 'sys';
  var service = 'sessionRemote';
  var method = 'getBackendSessionBySid';
  var args = [sid];
  rpcInvoke(this.app, frontendId, namespace, service, method,
            args, BackendSessionCB.bind(null, this, cb));
};


getByUid

BackendSessionService.prototype.getByUid()

  • 类型:方法
  • 参数
    • frontendId - 添加到当前 session 的前端服务器 id
    • uid - 与 session 绑定的用户id
    • cb - 回调函数,其签名为cb(err, BackendSession)

通过前端服务器服务器id 及 user id 获取后端 session

源码

BackendSessionService.prototype.getByUid = function(frontendId, uid, cb) {
  var namespace = 'sys';
  var service = 'sessionRemote';
  var method = 'getBackendSessionsByUid';
  var args = [uid];
  rpcInvoke(this.app, frontendId, namespace, service, method,
            args, BackendSessionCB.bind(null, this, cb));
};


kickBySid

BackendSessionService.prototype.kickBySid()

  • 类型:方法
  • 参数
    • frontendId - 前端服务器 id
    • sid - session id
    • cb - 回调函数

将指定 id 的 session 从前端服务器踢出

源码

BackendSessionService.prototype.kickBySid = function(frontendId, sid, cb) {
  var namespace = 'sys';
  var service = 'sessionRemote';
  var method = 'kickBySid';
  var args = [sid];
  rpcInvoke(this.app, frontendId, namespace, service, method, args, cb);
};


kickBySid

BackendSessionService.prototype.kickByUid()

  • 类型:方法
  • 参数
    • frontendId - 前端服务器 id
    • uid - 用户 id
    • cb - 回调函数

将指定用户 id 的 session 从前端服务器踢出

源码

BackendSessionService.prototype.kickByUid = function(frontendId, uid, cb) {
  var namespace = 'sys';
  var service = 'sessionRemote';
  var method = 'kickByUid';
  var args = [uid];
  rpcInvoke(this.app, frontendId, namespace, service, method, args, cb);
};


4. BackendSession

BackendSession是对前端内部 session 的一个代理,它是在本地服务器保存的一个键/值对对象。其由BackendSessionService创建和维护,前端服务器本地 session不能直接访问,但我们可以通过BackendSession来对其访问和修改。

对于后端 session 的主要操作仅应该是读取,对后端 session 的任何修改都是本地的,会在下次请求中被丢弃。如果需要修改,就应该显式的将其推送(pushpushAll)到前端。推送后,会将相同 key 的值进行覆盖,并会在下次请求中看到修改后的值。如果要在不同进程中并发地推送 session,则必须确保外部事务。


bind

backendSession.prototype.bind()

  • 类型:方法
  • 参数
    • uid - 用户 id
    • cb - 回调函数

将当前 session 与指定的 user id 绑定。这会推送 uid 到前端服务器,并绑定 uid 到前端内部 session。

源码

BackendSession.prototype.bind = function(uid, cb) {
  var self = this;
  this.__sessionService__.bind(this.frontendId, this.id, uid, function(err) {
    if(!err) {
      self.uid = uid;
    }
    utils.invokeCallback(cb, err);
  });
};


unbind

backendSession.prototype.unbind()

  • 类型:方法
  • 参数
    • uid - 用户 id
    • cb - 回调函数

将当前 session 与 user id 解绑。这会推送 uid 到前端服务器,并将 uid 与前端内部 session 解绑。

源码

BackendSession.prototype.unbind = function(uid, cb) {
  var self = this;
  this.__sessionService__.unbind(this.frontendId, this.id, uid, function(err) {
    if(!err) {
      self.uid = null;
    }
    utils.invokeCallback(cb, err);
  });
};


set

backendSession.prototype.set()

  • 类型:方法
  • 参数
    • key - key
    • value - value

设置 key/value 到后端 session。

源码

BackendSession.prototype.set = function(key, value) {
  this.settings[key] = value;
};


get

backendSession.prototype.get()

  • 类型:方法
  • 参数
    • key - key

获取后端 session 中,指定 key 的值。

源码

BackendSession.prototype.get = function(key) {
  return this.settings[key];
};


push

backendSession.prototype.push()

  • 类型:方法
  • 参数
    • key - key
    • cb - 回调函数

推送后端 session 中指定 key/value 到前端服务器内部 session。

源码

BackendSession.prototype.push = function(key, cb) {
  this.__sessionService__.push(this.frontendId, this.id, key, this.get(key), cb);
};


pushAll

backendSession.prototype.pushAll()

  • 类型:方法
  • 参数
    • cb - 回调函数

推送后端 session 中所有 key/value 到前端服务器内部 session。

源码

BackendSession.prototype.pushAll = function(cb) {
  this.__sessionService__.pushAll(this.frontendId, this.id, this.settings, cb);
};


5. SessionService

Session 服务即前端服务器Session服务,它为每个已连接的客户端维持一个内部 session。

SessionService由 session 组件创建,并且其仅对前端服务器有效。在前端服务器中,可以通过app.get('sessionService')app.sessionService访问。


kick

channelService.prototype.kick()

  • 类型:方法
  • 参数
    • uid - 与 session 关联的用户 id
    • cb - 回调函数

将指定用户的所有 session 踢下线。

源码

SessionService.prototype.kick = function(uid, reason, cb) {
  // compatible for old kick(uid, cb);
  if(typeof reason === 'function') {
    cb = reason;
    reason = 'kick';
  }
  var sessions = this.getByUid(uid);

  if(sessions) {
    // notify client
    for(var i=0, l=sessions.length; i < l; i++) {
      sessions[i].closed(reason);
    }
    process.nextTick(function() {
      utils.invokeCallback(cb);
    });
  } else {
    process.nextTick(function() {
      utils.invokeCallback(cb);
    });
  }
};


kickBySessionId

channelService.prototype.kickBySessionId()

  • 类型:方法
  • 参数
    • sid - session id
    • cb - 回调函数

通过 session id将用户踢下线。

源码

SessionService.prototype.kickBySessionId = function(sid, cb) {
  var session = this.get(sid);

  if(session) {
    // notify client
    session.closed('kick');
    process.nextTick(function() {
      utils.invokeCallback(cb);
    });
  } else {
    process.nextTick(function() {
      utils.invokeCallback(cb);
    });
  }
};


5. ChannelService

在本地服务器创建并维持频道(channel

ChannelServicechannel组件创建,该组件是 pomelo 默认加载的组件,其可以通过app.get('channelService')访问。


createChannel

channelService.prototype.createChannel()

  • 类型:方法
  • 参数
    • name - channel 名

创建指定名称的 channel。

源码

ChannelService.prototype.createChannel = function(name) {
  if(this.channels[name]) {
    return this.channels[name];
  }

  var c = new Channel(name, this);
  this.channels[name] = c;
  return c;
};


getChannel

channelService.prototype.getChannel()

  • 类型:方法
  • 参数
    • name - channel 名
    • create - 设置为true时,将会创建 channel

获取指定名称的 channel。

源码

ChannelService.prototype.getChannel = function(name, create) {
  var channel = this.channels[name];
  if(!channel && !!create) {
    channel = this.channels[name] = new Channel(name, this);
  }
  return channel;
};


destroyChannel

channelService.prototype.destroyChannel()

  • 类型:方法
  • 参数
    • name - channel 名

销毁指定名称的 channel。

源码

ChannelService.prototype.destroyChannel = function(name) {
  delete this.channels[name];
};


pushMessageByUids

channelService.prototype.pushMessageByUids()

  • 类型:方法
  • 参数
    • route - 消息路由
    • msg - 消息
    • uids - 接收消息的用户信息列表,其格式为: [{uid: userId, sid: frontendServerId}]
    • opts - 可选,用户定义的推送选项
    • cb - 回调函数,其签名为cb(err)

向指定的uids推送消息。

源码

ChannelService.prototype.pushMessageByUids = function(route, msg, uids, opts, cb) {
  if(typeof route !== 'string') {
    cb = opts;
    opts = uids;
    uids = msg;
    msg = route;
    route = msg.route;
  }

  if(!cb && typeof opts === 'function') {
    cb = opts;
    opts = {};
  }

  if(!uids || uids.length === 0) {
    utils.invokeCallback(cb, new Error('uids should not be empty'));
    return;
  }
  var groups = {}, record;
  for(var i=0, l=uids.length; i < l; i++) {
    record = uids[i];
    add(record.uid, record.sid, groups);
  }

  sendMessageByGroup(this, route, msg, groups, opts, cb);
};


broadcast

channelService.prototype.broadcast()

  • 类型:方法
  • 参数
    • stype - 字符串,前端服务器类型
    • route - 消息路由
    • msg - 消息
    • opts - 可选,用户定义的推送选项
    • cb - 回调函数,其签名为cb(err)

向所有已连接的推送消息。

源码

ChannelService.prototype.broadcast = function(stype, route, msg, opts, cb) {
  var app = this.app;
  var namespace = 'sys';
  var service = 'channelRemote';
  var method = 'broadcast';
  var servers = app.getServersByType(stype);

  if(!servers || servers.length === 0) {
    // server list is empty
    utils.invokeCallback(cb);
    return;
  }

  var count = servers.length;
  var successFlag = false;

  var latch = countDownLatch.createCountDownLatch(count, function() {
    if(!successFlag) {
      utils.invokeCallback(cb, new Error('broadcast fails'));
      return;
    }
    utils.invokeCallback(cb, null);
  });

  var genCB = function() {
    return function(err) {
      if(err) {
        logger.error('[broadcast] fail to push message, err:' + err.stack);
        latch.done();
        return;
      }
      successFlag = true;
      latch.done();
    };
  };

  opts = {type: 'broadcast', userOptions: opts || {}};

  // for compatiblity 
  opts.isBroadcast = true;
  if(opts.userOptions) {
    opts.binded = opts.userOptions.binded;
    opts.filterParam = opts.userOptions.filterParam;
  }

  for(var i=0, l=count; i < l; i++) {
    app.rpcInvoke(servers[i].id, {namespace: namespace, service: service,
      method: method, args: [route, msg, opts]}, genCB());
  }
};


7. Channel

Channel维持已订阅的接收者集合。你将可以添加用户到一个channel中,并通过 channel 广播消息。


add

channelService.prototype.add()

  • 类型:方法
  • 参数
    • uid - 要添加的用户的 id
    • sid - 用户连接的前端服务器的id

添加用户到 channel 中

源码

Channel.prototype.add = function(uid, sid) {
  if(this.state > ST_INITED) {
    return false;
  } else {
    var res = add(uid, sid, this.groups);
    if(res) {
      this.records[uid] = {sid: sid, uid: uid};
    }
    return res;
  }
};


leave

channelService.prototype.leave()

  • 类型:方法
  • 参数
    • uid - 用户的 id
    • sid - 用户连接的前端服务器的id

将用户从 channel 中移除

源码

Channel.prototype.leave = function(uid, sid) {
  if(!uid || !sid) {
    return false;
  }
  delete this.records[uid];
  var res = deleteFrom(uid, sid, this.groups[sid]);
  if(this.groups[sid] && this.groups[sid].length === 0) {
    delete this.groups[sid];
  }
  return res;
};


getMembers

channelService.prototype.getMembers()

  • 类型:方法

获取 channel 中的所有成员(用户)

注意:本操作较重,应少用或不用

源码

Channel.prototype.getMembers = function() {
  var res = [], groups = this.groups;
  var group, i, l;
  for(var sid in groups) {
    group = groups[sid];
    for(i=0, l=group.length; i < l; i++) {
      res.push(group[i]);
    }
  }
  return res;
};


getMember

channelService.prototype.getMember()

  • 类型:方法
  • 参数
    • uid - 用户的 id

获取 channel 中指定用户的信息

源码

Channel.prototype.getMember = function(uid) {
  return this.records[uid];
};


destroy

channelService.prototype.destroy()

  • 类型:方法

销毁指定的 channel

源码

Channel.prototype.destroy = function() {
  this.state = ST_DESTROYED;
  this.__channelService__.destroyChannel(this.name);
};


pushMessage

channelService.prototype.pushMessage()

  • 类型:方法
  • 参数
    • route - 消息路由
    • msg - 消息
    • opts - 可选,用户定义的推送选项
    • cb - 回调函数,其签名为cb(err)

将消息推送到 channel 中的所有成员

源码

Channel.prototype.pushMessage = function(route, msg, opts, cb) {
  if(this.state !== ST_INITED) {
    utils.invokeCallback(new Error('channel is not running now'));
    return;
  }

  if(typeof route !== 'string') {
    cb = opts;
    opts = msg;
    msg = route;
    route = msg.route;
  }

  if(!cb && typeof opts === 'function') {
    cb = opts;
    opts = {};
  }

  sendMessageByGroup(this.__channelService__, route, msg, this.groups, opts, cb);
};