Skip to content

Commit

Permalink
[perf] Use pattern matching at the namespace level (#217)
Browse files Browse the repository at this point in the history
This follows #46. Each node will now listen to only three channels:

- `socket.io#<namespace>#*`: used when broadcasting
- `socket.io-request#<namespace>#`: used for requesting information (ex: get every room in the cluster)
- `socket.io-response#<namespace>#`: used for responding to requests

We keep the benefits of #46 since:

- messages from other namespaces are ignored
- when emitting to a single room, the message is sent to
  `socket.io#<namespace>#<my-room>`, so listeners can check whether they
  have the room before unpacking the message (which is CPU consuming).

But there is no need to subscribe / unsubscribe every time a socket
joins or leaves a room (which is also CPU consuming when there are
thousands of subscriptions).
  • Loading branch information
darrachequesne authored May 10, 2017
1 parent d3d000b commit 05f926e
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 133 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
language: node_js
sudo: false
node_js:
- "0.10"
- "0.12"
- "4"
- "6"
- "node"
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ The following options are allowed:
- `key`: the name of the key to pub/sub events on as prefix (`socket.io`)
- `host`: host to connect to redis on (`localhost`)
- `port`: port to connect to redis on (`6379`)
- `subEvent`: optional, the redis client event name to subscribe to (`messageBuffer`)
- `pubClient`: optional, the redis client to publish events on
- `subClient`: optional, the redis client to subscribe to events on
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`)
- `withChannelMultiplexing`: optional, whether channel multiplexing is enabled (a new subscription will be trigggered for each room) (`true`)

If you decide to supply `pubClient` and `subClient`, make sure you use
[node_redis](https://github.com/mranney/node_redis) as a client or one
Expand Down
146 changes: 29 additions & 117 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ var redis = require('redis').createClient;
var msgpack = require('msgpack-lite');
var Adapter = require('socket.io-adapter');
var debug = require('debug')('socket.io-redis');
var async = require('async');

/**
* Module exports.
Expand Down Expand Up @@ -50,11 +49,8 @@ function adapter(uri, opts) {
// opts
var pub = opts.pubClient;
var sub = opts.subClient;

var prefix = opts.key || 'socket.io';
var subEvent = opts.subEvent || 'messageBuffer';
var requestsTimeout = opts.requestsTimeout || 1000;
var withChannelMultiplexing = false !== opts.withChannelMultiplexing;

// init clients if needed
function createClient() {
Expand Down Expand Up @@ -85,7 +81,6 @@ function adapter(uri, opts) {
this.uid = uid;
this.prefix = prefix;
this.requestsTimeout = requestsTimeout;
this.withChannelMultiplexing = withChannelMultiplexing;

this.channel = prefix + '#' + nsp.name + '#';
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
Expand All @@ -107,11 +102,17 @@ function adapter(uri, opts) {

var self = this;

sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
sub.psubscribe(this.channel + '*', function(err){
if (err) self.emit('error', err);
});

sub.on('pmessageBuffer', this.onmessage.bind(this));

sub.subscribe([this.requestChannel, this.responseChannel], function(err){
if (err) self.emit('error', err);
});

sub.on(subEvent, this.onmessage.bind(this));
sub.on('messageBuffer', this.onrequest.bind(this));

function onError(err) {
self.emit('error', err);
Expand All @@ -132,21 +133,22 @@ function adapter(uri, opts) {
* @api private
*/

Redis.prototype.onmessage = function(channel, msg){
Redis.prototype.onmessage = function(pattern, channel, msg){
channel = channel.toString();

if (this.channelMatches(channel, this.requestChannel)) {
return this.onrequest(channel, msg);
} else if (this.channelMatches(channel, this.responseChannel)) {
return this.onresponse(channel, msg);
} else if (!this.channelMatches(channel, this.channel)) {
if (!this.channelMatches(channel, this.channel)) {
return debug('ignore different channel');
}

var room = channel.substring(this.channel.length);
if (room !== '' && !this.rooms.hasOwnProperty(room)) {
return debug('ignore unknown room %s', room);
}

var args = msgpack.decode(msg);
var packet;

if (uid == args.shift()) return debug('ignore same uid');
if (uid === args.shift()) return debug('ignore same uid');

packet = args[0];

Expand All @@ -170,6 +172,14 @@ function adapter(uri, opts) {
*/

Redis.prototype.onrequest = function(channel, msg){
channel = channel.toString();

if (this.channelMatches(channel, this.responseChannel)) {
return this.onresponse(channel, msg);
} else if (!this.channelMatches(channel, this.requestChannel)) {
return debug('ignore different channel');
}

var self = this;
var request;

Expand Down Expand Up @@ -394,116 +404,15 @@ function adapter(uri, opts) {
packet.nsp = this.nsp.name;
if (!(remote || (opts && opts.flags && opts.flags.local))) {
var msg = msgpack.encode([uid, packet, opts]);
if (this.withChannelMultiplexing && opts.rooms && opts.rooms.length === 1) {
pub.publish(this.channel + opts.rooms[0] + '#', msg);
if (opts.rooms && opts.rooms.length === 1) {
pub.publish(this.channel + opts.rooms[0], msg);
} else {
pub.publish(this.channel, msg);
}
}
Adapter.prototype.broadcast.call(this, packet, opts);
};

/**
* Subscribe client to room messages.
*
* @param {String} client id
* @param {String} room
* @param {Function} callback (optional)
* @api public
*/

Redis.prototype.add = function(id, room, fn){
debug('adding %s to %s ', id, room);
var self = this;
// subscribe only once per room
var alreadyHasRoom = this.rooms.hasOwnProperty(room);
Adapter.prototype.add.call(this, id, room);

if (!this.withChannelMultiplexing || alreadyHasRoom) {
if (fn) fn(null);
return;
}

var channel = this.channel + room + '#';

function onSubscribe(err) {
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
if (fn) fn(null);
}

sub.subscribe(channel, onSubscribe);
};

/**
* Unsubscribe client from room messages.
*
* @param {String} session id
* @param {String} room id
* @param {Function} callback (optional)
* @api public
*/

Redis.prototype.del = function(id, room, fn){
debug('removing %s from %s', id, room);

var self = this;
var hasRoom = this.rooms.hasOwnProperty(room);
Adapter.prototype.del.call(this, id, room);

if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) {
var channel = this.channel + room + '#';

function onUnsubscribe(err) {
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
if (fn) fn(null);
}

sub.unsubscribe(channel, onUnsubscribe);
} else {
if (fn) process.nextTick(fn.bind(null, null));
}
};

/**
* Unsubscribe client completely.
*
* @param {String} client id
* @param {Function} callback (optional)
* @api public
*/

Redis.prototype.delAll = function(id, fn){
debug('removing %s from all rooms', id);

var self = this;
var rooms = this.sids[id];

if (!rooms) {
if (fn) process.nextTick(fn.bind(null, null));
return;
}

async.each(Object.keys(rooms), function(room, next){
self.del(id, room, next);
}, function(err){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}
delete self.sids[id];
if (fn) fn(null);
});
};

/**
* Gets a list of clients by sid.
*
Expand Down Expand Up @@ -531,6 +440,7 @@ function adapter(uri, opts) {
}

numsub = parseInt(numsub[1], 10);
debug('waiting for %d responses to "clients" request', numsub);

var request = JSON.stringify({
requestid : requestid,
Expand Down Expand Up @@ -619,6 +529,7 @@ function adapter(uri, opts) {
}

numsub = parseInt(numsub[1], 10);
debug('waiting for %d responses to "allRooms" request', numsub);

var request = JSON.stringify({
requestid : requestid,
Expand Down Expand Up @@ -794,6 +705,7 @@ function adapter(uri, opts) {
}

numsub = parseInt(numsub[1], 10);
debug('waiting for %d responses to "customRequest" request', numsub);

var request = JSON.stringify({
requestid : requestid,
Expand Down
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
"test": "mocha"
},
"dependencies": {
"async": "2.1.4",
"debug": "2.3.3",
"msgpack-lite": "0.1.26",
"redis": "2.6.3",
"socket.io-adapter": "0.5.0",
"socket.io-adapter": "~1.1.0",
"uid2": "0.0.3"
},
"devDependencies": {
"expect.js": "0.3.1",
"ioredis": "2.5.0",
"mocha": "3.2.0",
"socket.io": "1.7.x",
"socket.io-client": "1.7.x"
"socket.io": "latest",
"socket.io-client": "latest"
}
}
34 changes: 26 additions & 8 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ var socket1, socket2, socket3;
{
name: 'socket.io-redis'
},
{
name: 'socket.io-redis without channel multiplexing',
options: {
withChannelMultiplexing: false
}
},
{
name: 'socket.io-redis with ioredis',
options: function () {
Expand Down Expand Up @@ -152,7 +146,7 @@ var socket1, socket2, socket3;
it('deletes rooms upon disconnection', function(done){
socket1.join('woot');
socket1.on('disconnect', function() {
expect(socket1.adapter.sids[socket1.id]).to.be.empty();
expect(socket1.adapter.sids[socket1.id]).to.be(undefined);
expect(socket1.adapter.rooms).to.be.empty();
client1.disconnect();
done();
Expand All @@ -175,6 +169,26 @@ var socket1, socket2, socket3;
});
});

it('ignores messages from unknown channels', function(done){
namespace1.adapter.subClient.psubscribe('f?o', function () {
namespace3.adapter.pubClient.publish('foo', 'bar');
});

namespace1.adapter.subClient.on('pmessageBuffer', function () {
setTimeout(done, 50);
});
});

it('ignores messages from unknown channels (2)', function(done){
namespace1.adapter.subClient.subscribe('woot', function () {
namespace3.adapter.pubClient.publish('woot', 'toow');
});

namespace1.adapter.subClient.on('messageBuffer', function () {
setTimeout(done, 50);
});
});

describe('rooms', function () {
it('returns rooms of a given client', function(done){
socket1.join('woot1', function () {
Expand All @@ -200,6 +214,7 @@ var socket1, socket2, socket3;
it('returns all rooms accross several nodes', function(done){
socket1.join('woot1', function () {
namespace1.adapter.allRooms(function(err, rooms){
expect(err).to.be(null);
expect(rooms).to.have.length(4);
expect(rooms).to.contain(socket1.id);
expect(rooms).to.contain(socket2.id);
Expand All @@ -212,6 +227,7 @@ var socket1, socket2, socket3;

it('makes a given socket join a room', function(done){
namespace3.adapter.remoteJoin(socket1.id, 'woot3', function(err){
expect(err).to.be(null);
var rooms = Object.keys(socket1.rooms);
expect(rooms).to.have.length(2);
expect(rooms).to.contain('woot3');
Expand All @@ -222,6 +238,7 @@ var socket1, socket2, socket3;
it('makes a given socket leave a room', function(done){
socket1.join('woot3', function(){
namespace3.adapter.remoteLeave(socket1.id, 'woot3', function(err){
expect(err).to.be(null);
var rooms = Object.keys(socket1.rooms);
expect(rooms).to.have.length(1);
expect(rooms).not.to.contain('woot3');
Expand All @@ -237,6 +254,7 @@ var socket1, socket2, socket3;
}

namespace3.adapter.customRequest('hello', function(err, replies){
expect(err).to.be(null);
expect(replies).to.have.length(3);
expect(replies).to.contain(namespace1.adapter.uid);
done();
Expand Down Expand Up @@ -297,7 +315,7 @@ function init(options){
socket1 = _socket1;
socket2 = _socket2;
socket3 = _socket3;
done();
setTimeout(done, 100);
});
});
});
Expand Down

0 comments on commit 05f926e

Please sign in to comment.