Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with zookeeper connection loss recovery #458

Merged
merged 5 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions lib/errors/FailedToRegisterConsumerError.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
var util = require('util');
var NestedError = require('nested-error-stacks');

/**
* Failed to register the consumer
*
* @param {String} message A message describing the problem with the registration of the consumer
* @param {Error} error An error related to the registration of the consumer
*
* @constructor
*/
var FailedToRegisterConsumerError = function (message) {
Error.captureStackTrace(this, this);
var FailedToRegisterConsumerError = function (message, nested) {
NestedError.call(this, message, nested);
this.message = message;
};

util.inherits(FailedToRegisterConsumerError, Error);
util.inherits(FailedToRegisterConsumerError, NestedError);
FailedToRegisterConsumerError.prototype.name = 'FailedToRegisterConsumerError';

module.exports = FailedToRegisterConsumerError;
57 changes: 40 additions & 17 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,45 @@ HighLevelConsumer.prototype.connect = function () {
});
});

// Check partition ownership
function checkPartitionOwnership (callback) {
async.each(self.topicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
cbb(err);
} else {
cbb();
}
});
} else {
cbb();
}
}, callback);
}

// Check partition ownership and registration
this.checkPartitionOwnershipInterval = setInterval(function () {
if (!self.rebalancing) {
async.each(self.topicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
cbb(err);
async.parallel([
function (callback) {
checkPartitionOwnership(callback);
},

function (callback) {
self.client.zk.isConsumerRegistered(self.options.groupId, self.id, function (error, registered) {
if (error) {
return callback(error);
}
if (registered) {
callback();
} else {
cbb();
callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId)));
}
});
} else {
cbb();
}
}, function (err) {
if (err) {
self.emit('error', new errors.FailedToRegisterConsumerError(err.toString()));
], function (error) {
if (error) {
self.emit('error', new errors.FailedToRegisterConsumerError(error.toString(), error));
}
});
}
Expand Down Expand Up @@ -194,9 +215,7 @@ HighLevelConsumer.prototype.connect = function () {
}

// Wait for the consumer to be ready
this.on('registered', function () {
rebalance();
});
this.on('registered', rebalance);

function register (fn) {
debug('Registered listeners %s', self.id);
Expand Down Expand Up @@ -228,10 +247,14 @@ HighLevelConsumer.prototype.connect = function () {
attachZookeeperErrorListener();

this.client.on('zkReconnect', function () {
debug('zookeeper reconnect for %s', self.id);
attachZookeeperErrorListener();

self.registerConsumer(function () {
rebalance();
self.registerConsumer(function (error) {
if (error) {
return self.emit('error', new errors.FailedToRegisterConsumerError('Failed to register consumer on zkReconnect', error));
}
self.emit('registered');
});
});

Expand Down
10 changes: 10 additions & 0 deletions lib/zookeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ Zookeeper.prototype.listBrokers = function (cb) {
);
};

Zookeeper.prototype.isConsumerRegistered = function (groupId, consumerId, callback) {
var path = '/consumers/' + groupId + '/ids/' + consumerId;
this.client.exists(path, function (error, stat) {
if (error) {
return callback(error);
}
callback(!!stat);
});
};

Zookeeper.prototype.listConsumers = function (groupId) {
var that = this;
var path = '/consumers/' + groupId + '/ids';
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"debug": "^2.1.3",
"lodash": ">3.0 <4.0",
"minimatch": "^3.0.2",
"nested-error-stacks": "^1.0.2",
"node-uuid": "~1.4.3",
"node-zookeeper-client": "~0.2.2",
"optional": "^0.1.3",
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/mockZookeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ function FakeZookeeper () {
this.deletePartitionOwnership = function (groupId, topic, partition, cb) {
setImmediate(cb);
};
this.isConsumerRegistered = function (groupId, id, cb) {
setImmediate(cb);
};
this.addPartitionOwnership = function (id, groupId, topic, partition, cb) {
setImmediate(cb);
};
Expand Down
169 changes: 165 additions & 4 deletions test/test.highlevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var HighLevelConsumer = require('../lib/highLevelConsumer');
var FakeClient = require('./mocks/mockClient');
var should = require('should');
var InvalidConfigError = require('../lib/errors/InvalidConfigError');
var FailedToRegisterConsumerError = require('../lib/errors/FailedToRegisterConsumerError');

describe('HighLevelConsumer', function () {
describe('#close', function (done) {
Expand Down Expand Up @@ -63,6 +64,7 @@ describe('HighLevelConsumer', function () {
client = new FakeClient();
unregisterSpy = sinon.spy(client.zk, 'unregisterConsumer');
consumer = new HighLevelConsumer(client, [], {groupId: 'mygroup'});
clearInterval(consumer.checkPartitionOwnershipInterval);
releasePartitionsStub = sinon.stub(consumer, '_releasePartitions').yields();
});

Expand All @@ -86,20 +88,32 @@ describe('HighLevelConsumer', function () {
});

describe('validate groupId', function () {
var clock;

beforeEach(function () {
clock = sinon.useFakeTimers();
});

afterEach(function () {
clock.restore();
});

function validateThrowsInvalidConfigError (groupId) {
var consumer;
should.throws(function () {
var client = new FakeClient();
// eslint-disable-next-line no-new
new HighLevelConsumer(client, [ { topic: 'some_topic' } ], {groupId: groupId});
consumer = new HighLevelConsumer(client, [ { topic: 'some_topic' } ], {groupId: groupId});
}, InvalidConfigError);
consumer && consumer.close();
}

function validateDoesNotThrowInvalidConfigError (groupId) {
var consumer;
should.doesNotThrow(function () {
var client = new FakeClient();
// eslint-disable-next-line no-new
new HighLevelConsumer(client, [ { topic: 'some_topic' } ], {groupId: groupId});
consumer = new HighLevelConsumer(client, [ { topic: 'some_topic' } ], {groupId: groupId});
});
consumer.close();
}

it('should throws an error on invalid group IDs', function () {
Expand All @@ -116,6 +130,55 @@ describe('HighLevelConsumer', function () {
});
});

describe('Reregister consumer on zookeeper reconnection', function () {
var client, consumer, sandbox;
var async = require('async');

beforeEach(function () {
client = new FakeClient();

consumer = new HighLevelConsumer(
client,
[ {topic: 'fake-topic'} ],
{groupId: 'zkReconnect-Test'}
);

sandbox = sinon.sandbox.create();
});

afterEach(function () {
consumer.close();
sandbox.restore();
consumer = null;
});

it('should try to register the consumer and emit error on failure', function (done) {
sandbox.stub(consumer, 'registerConsumer').yields(new Error('failed'));
consumer.once('error', function (error) {
error.should.be.an.instanceOf(FailedToRegisterConsumerError);
error.message.should.be.eql('Failed to register consumer on zkReconnect');
done();
});
client.emit('zkReconnect');
});

it('should register the consumer and emit registered on success', function (done) {
sandbox.stub(consumer, 'registerConsumer').yields(null);
async.parallel([
function (callback) {
consumer.once('registered', callback);
},
function (callback) {
consumer.once('rebalancing', callback);
}
], function () {
sinon.assert.calledOnce(consumer.registerConsumer);
done();
});
client.emit('zkReconnect');
});
});

describe('#setOffset', function () {
var client, highLevelConsumer;

Expand Down Expand Up @@ -151,6 +214,104 @@ describe('HighLevelConsumer', function () {
});
});

describe('ensure partition ownership and registration', function () {
var client, consumer, sandbox;
var twentySeconds = 20000;

beforeEach(function () {
sandbox = sinon.sandbox.create();
sandbox.useFakeTimers();
client = new FakeClient();

consumer = new HighLevelConsumer(
client,
[ {topic: 'fake-topic'} ]
);
});

afterEach(function () {
consumer.close();
sandbox.restore();
client = null;
consumer = null;
});

it('should emit an FailedToRegisterConsumerError when ownership changes', function (done) {
consumer.topicPayloads = [
{topic: 'fake-topic', partition: '0', offset: 0, maxBytes: 1048576, metadata: 'm'},
{topic: 'fake-topic', partition: '1', offset: 0, maxBytes: 1048576, metadata: 'm'}
];

var checkPartitionOwnershipStub = sandbox.stub(client.zk, 'checkPartitionOwnership');

checkPartitionOwnershipStub.withArgs(consumer.id, consumer.options.groupId, consumer.topicPayloads[0].topic,
consumer.topicPayloads[0].partition).yields(new Error('not owned'));

consumer.on('error', function (error) {
error.should.be.an.instanceOf(FailedToRegisterConsumerError);
error.message.should.be.eql('Error: not owned');
sinon.assert.calledOnce(checkPartitionOwnershipStub);
done();
});
sandbox.clock.tick(twentySeconds);
});

it('should emit an FailedToRegisterConsumerError when no longer registered', function (done) {
consumer.topicPayloads = [
{topic: 'fake-topic', partition: '0', offset: 0, maxBytes: 1048576, metadata: 'm'},
{topic: 'fake-topic', partition: '1', offset: 0, maxBytes: 1048576, metadata: 'm'}
];

sandbox.stub(client.zk, 'checkPartitionOwnership').yields();
sandbox.stub(client.zk, 'isConsumerRegistered').yields(null, false);

consumer.on('error', function (error) {
error.should.be.an.instanceOf(FailedToRegisterConsumerError);
error.message.should.be.eql('Error: Consumer ' + consumer.id + ' is not registered in group kafka-node-group');
sinon.assert.calledOnce(client.zk.isConsumerRegistered);
done();
});
sandbox.clock.tick(twentySeconds);
});

it('should emit an FailedToRegisterConsumerError when registered check fails', function (done) {
consumer.topicPayloads = [
{topic: 'fake-topic', partition: '0', offset: 0, maxBytes: 1048576, metadata: 'm'},
{topic: 'fake-topic', partition: '1', offset: 0, maxBytes: 1048576, metadata: 'm'}
];

sandbox.stub(client.zk, 'checkPartitionOwnership').yields();
sandbox.stub(client.zk, 'isConsumerRegistered').yields(new Error('CONNECTION_LOSS[-4]'));

consumer.on('error', function (error) {
error.should.be.an.instanceOf(FailedToRegisterConsumerError);
error.nested.should.be.an.instanceOf(Error);
error.nested.message.should.be.eql('CONNECTION_LOSS[-4]');
sinon.assert.calledOnce(client.zk.isConsumerRegistered);
sinon.assert.calledTwice(client.zk.checkPartitionOwnership);
done();
});
sandbox.clock.tick(twentySeconds);
});

it('should not emit an error if partition ownership checks succeeds', function (done) {
consumer.topicPayloads = [
{topic: 'fake-topic', partition: '0', offset: 0, maxBytes: 1048576, metadata: 'm'},
{topic: 'fake-topic', partition: '1', offset: 0, maxBytes: 1048576, metadata: 'm'}
];

var checkPartitionOwnershipStub = sandbox.stub(client.zk, 'checkPartitionOwnership').yields();
sandbox.stub(client.zk, 'isConsumerRegistered').yields(null, true);

sandbox.clock.tick(twentySeconds);
sandbox.clock.restore();
setImmediate(function () {
sinon.assert.calledTwice(checkPartitionOwnershipStub);
done();
});
});
});

describe('rebalance', function () {
var client,
highLevelConsumer,
Expand Down
Loading