Skip to content

Commit

Permalink
Fix potential missed rebalanced events (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink authored Aug 9, 2016
1 parent 256ecda commit 9de0ae0
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 8 deletions.
40 changes: 32 additions & 8 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var HighLevelConsumer = function (client, topics, options) {
this.closing = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.pendingRebalances = 0;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
Expand Down Expand Up @@ -197,18 +198,25 @@ HighLevelConsumer.prototype.connect = function () {
rebalance();
});

function register () {
function register (fn) {
debug('Registered listeners %s', self.id);
self.client.zk.on('consumersChanged', rebalance);
self.client.zk.on('partitionsChanged', rebalance);
self.client.on('brokersChanged', rebalance);
self.client.zk.on('consumersChanged', fn || rebalance);
self.client.zk.on('partitionsChanged', fn || rebalance);
self.client.on('brokersChanged', fn || rebalance);
}

function deregister () {
function deregister (fn) {
debug('Deregistered listeners %s', self.id);
self.client.zk.removeListener('consumersChanged', rebalance);
self.client.zk.removeListener('partitionsChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
self.client.zk.removeListener('consumersChanged', fn || rebalance);
self.client.zk.removeListener('partitionsChanged', fn || rebalance);
self.client.removeListener('brokersChanged', fn || rebalance);
}

function pendingRebalance () {
if (self.rebalancing) {
self.pendingRebalances++;
debug('%s added a pendingRebalances %d', self.id, self.pendingRebalances);
}
}

function attachZookeeperErrorListener () {
Expand All @@ -227,6 +235,22 @@ HighLevelConsumer.prototype.connect = function () {
});
});

this.on('rebalanced', function () {
deregister(pendingRebalance);
if (self.pendingRebalances && !self.closing) {
debug('%s pendingRebalances is %d scheduling a rebalance...', self.id, self.pendingRebalances);
setTimeout(function () {
debug('%s running scheduled rebalance', self.id);
rebalance();
}, 500);
}
});

this.on('rebalancing', function () {
register(pendingRebalance);
self.pendingRebalances = 0;
});

this.client.on('error', function (err) {
self.emit('error', err);
});
Expand Down
38 changes: 38 additions & 0 deletions test/test.highlevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,44 @@ describe('HighLevelConsumer', function () {
highLevelConsumer = null;
});

describe('pending rebalances', function () {
it('should initalize pending rebalances to zero', function () {
highLevelConsumer.pendingRebalances.should.be.eql(0);
});

function verifyPendingRebalances (event, done) {
sandbox.stub(client, 'refreshMetadata', function (topicNames, callback) {
highLevelConsumer.rebalancing.should.be.true;
highLevelConsumer.pendingRebalances.should.be.eql(0);
event();
highLevelConsumer.pendingRebalances.should.be.eql(1);
setImmediate(callback);
});
client.emit('ready');
highLevelConsumer.once('rebalanced', function () {
sandbox.restore();
highLevelConsumer.pendingRebalances.should.be.eql(1);
highLevelConsumer.on('rebalanced', done);
});
}

it('should queue brokersChanged events during a rebalance', function (done) {
verifyPendingRebalances(function () {
client.emit('brokersChanged');
}, done);
});
it('should queue consumersChanged rebalance events during a rebalance', function (done) {
verifyPendingRebalances(function () {
client.zk.emit('consumersChanged');
}, done);
});
it('should queue partitionsChanged rebalance events during a rebalance', function (done) {
verifyPendingRebalances(function () {
client.zk.emit('partitionsChanged');
}, done);
});
});

it('should emit rebalanced event and clear rebalancing flag only after offsets are updated', function (done) {
client.emit('ready');

Expand Down
12 changes: 12 additions & 0 deletions test/test.rebalance.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ describe('Integrated HLC Rebalance', function () {
}, 500);
});

it('verify two consumer consumes all messages on all partitions after two out of the four consumers are killed right away', function (done) {
var messages = generateMessages(4, 'verify 4 c 2 killed');
var verify = getConsumerVerifier(messages, 3, 2, done);

rearer.setVerifier(topic, groupId, verify);
rearer.raise(4, function () {
rearer.kill(2, function () {
sendMessages(messages, done);
});
});
});

it('verify three consumer consumes all messages on all partitions after one that is unassigned is killed', function (done) {
var messages = generateMessages(3, 'verify 2 c 2 killed');
var verify = getConsumerVerifier(messages, 3, 3, done);
Expand Down

0 comments on commit 9de0ae0

Please sign in to comment.