From 87aa99b53507a364020f6e8693ddc71f205d890c Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Sun, 2 Sep 2018 12:05:33 -0400 Subject: [PATCH] Consumer group leader now periodically checks for new topics/partitions this closes #887, closes #1056, --- lib/consumerGroup.js | 152 ++++++++++++++++++++++++++-------- test/test.consumerGroup.js | 164 +++++++++++++++++++++++++++++++++++-- 2 files changed, 274 insertions(+), 42 deletions(-) diff --git a/lib/consumerGroup.js b/lib/consumerGroup.js index daedb0d7..2b32d682 100644 --- a/lib/consumerGroup.js +++ b/lib/consumerGroup.js @@ -14,6 +14,7 @@ const ConsumerGroupRecovery = require('./consumerGroupRecovery'); const Heartbeat = require('./consumerGroupHeartbeat'); const createTopicPartitionList = require('./utils').createTopicPartitionList; const errors = require('./errors'); +const NestedError = require('nested-error-stacks'); const assert = require('assert'); const builtInProtocols = require('./assignment'); @@ -49,6 +50,7 @@ const DEFAULTS = { migrateHLC: false, migrateRolling: true, onRebalance: null, + topicPartitionCheckInterval: 30000, protocol: ['roundrobin'] }; @@ -260,7 +262,11 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal } var self = this; - var topics = _(groupMembers).map('subscription').flatten().uniq().value(); + var topics = _(groupMembers) + .map('subscription') + .flatten() + .uniq() + .value(); async.waterfall( [ @@ -271,6 +277,9 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal function (metadataResponse, callback) { var metadata = mapTopicToPartitions(metadataResponse[1].metadata); + + self.topicPartitionLength = createTopicPartitionLength(metadata, _.difference(topics, Object.keys(metadata))); + logger.debug('mapTopicToPartitions', metadata); protocol.assign(metadata, groupMembers, callback); } @@ -279,6 +288,68 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal ); }; +function createTopicPartitionLength (metadata, emptyTopics) { + const topicPartitionLength = {}; + _.forOwn(metadata, function (value, key) { + topicPartitionLength[key] = value.length; + }); + + for (const topic of emptyTopics) { + if (topic in topicPartitionLength) { + throw new Error(`Topic ${topic} is not empty`); + } + topicPartitionLength[topic] = 0; + } + return topicPartitionLength; +} + +ConsumerGroup.prototype.scheduleTopicPartitionCheck = function () { + if (this.isLeader && !this.topicPartitionCheckTimer) { + logger.debug(`${this.client.clientId} is leader scheduled new topic/partition check`); + this.topicPartitionCheckTimer = setTimeout(() => { + this.topicPartitionCheckTimer = null; + logger.debug('checking for new topics and partitions'); + this._checkTopicPartitionChange((error, changed) => { + if (error) { + return this.emit('error', new NestedError('topic/partition change check failed', error)); + } + + if (changed) { + logger.debug('Topic/Partitions has changed'); + async.series([ + callback => + this.options.autoCommit && this.generationId != null && this.memberId + ? this.commit(true, callback) + : callback(null), + callback => this.leaveGroup(callback), + callback => { + this.connect(); + callback(null); + } + ]); + } else { + logger.debug('no new Topic/Partitions'); + this.scheduleTopicPartitionCheck(); + } + }); + }, this.options.topicPartitionCheckInterval); + } +}; + +ConsumerGroup.prototype._checkTopicPartitionChange = function (callback) { + this.client.loadMetadataForTopics(this.topics, (error, metadataResponse) => { + if (error) { + return callback(error); + } + const metadata = mapTopicToPartitions(metadataResponse[1].metadata); + + const topicOrPartitionsChanged = _.some(this.topicPartitionLength, function (numberOfPartitions, topic) { + return numberOfPartitions !== _.get(metadata, `[${topic}].length`, 0); + }); + callback(null, topicOrPartitionsChanged); + }); +}; + function mapTopicToPartitions (metadata) { return _.mapValues(metadata, Object.keys); } @@ -304,10 +375,12 @@ ConsumerGroup.prototype.handleJoinGroup = function (joinGroupResponse, callback) ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callback) { var self = this; - const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => { - tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset]; - return tp; - }); + const offsetPayload = _(topicPartitionList) + .cloneDeep() + .map(tp => { + tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset]; + return tp; + }); self.getOffset().fetch(offsetPayload, function (error, result) { if (error) { @@ -349,8 +422,9 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback) if (self.options.fromOffset === 'none') { return callback( new Error( - `${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self - .options.groupId}'` + `${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${ + self.options.groupId + }'` ) ); } @@ -402,7 +476,7 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback) return p; }); if (noOffset && self.options.commitOffsetsOnFirstJoin) { - self.commit(true, (err) => { + self.commit(true, err => { callback(err, !err ? true : null); }); } else { @@ -537,6 +611,7 @@ ConsumerGroup.prototype.connect = function () { self.clearPendingFetches(); self.fetch(); } + self.scheduleTopicPartitionCheck(); self.startHeartbeats(); self.emit('connect'); self.emit('rebalanced'); @@ -645,37 +720,45 @@ ConsumerGroup.prototype.addTopics = function (topics, cb) { return; } - async.series([ - callback => this.client.topicExists(topics, callback), - callback => (this.options.autoCommit && this.generationId != null && this.memberId) - ? this.commit(true, callback) - : callback(null), - callback => this.leaveGroup(callback), - callback => { - this.topics = this.topics.concat(topics); - this.setupProtocols(this.options.protocol); - this.connect(); - callback(null); - } - ], error => error ? cb(error) : cb(null, `Add Topics ${topics.join(',')} Successfully`)); + async.series( + [ + callback => this.client.topicExists(topics, callback), + callback => + this.options.autoCommit && this.generationId != null && this.memberId + ? this.commit(true, callback) + : callback(null), + callback => this.leaveGroup(callback), + callback => { + this.topics = this.topics.concat(topics); + this.setupProtocols(this.options.protocol); + this.connect(); + callback(null); + } + ], + error => (error ? cb(error) : cb(null, `Add Topics ${topics.join(',')} Successfully`)) + ); }; ConsumerGroup.prototype.removeTopics = function (topics, cb) { topics = typeof topics === 'string' ? [topics] : topics; - async.series([ - callback => this.client.topicExists(topics, callback), - callback => (this.options.autoCommit && this.generationId != null && this.memberId) - ? this.commit(true, callback) - : callback(null), - callback => this.leaveGroup(callback), - callback => { - this.topics = _.difference(this.topics, topics); - this.setupProtocols(this.options.protocol); - this.connect(); - callback(null); - } - ], error => error ? cb(error) : cb(null, `Remove Topics ${topics.join(',')} Successfully`)); + async.series( + [ + callback => this.client.topicExists(topics, callback), + callback => + this.options.autoCommit && this.generationId != null && this.memberId + ? this.commit(true, callback) + : callback(null), + callback => this.leaveGroup(callback), + callback => { + this.topics = _.difference(this.topics, topics); + this.setupProtocols(this.options.protocol); + this.connect(); + callback(null); + } + ], + error => (error ? cb(error) : cb(null, `Remove Topics ${topics.join(',')} Successfully`)) + ); }; ConsumerGroup.prototype.close = function (force, cb) { @@ -683,6 +766,7 @@ ConsumerGroup.prototype.close = function (force, cb) { this.ready = false; this.stopHeartbeats(); + clearTimeout(this.topicPartitionCheckTimer); if (typeof force === 'function') { cb = force; diff --git a/test/test.consumerGroup.js b/test/test.consumerGroup.js index 0e52201e..7e48dc05 100644 --- a/test/test.consumerGroup.js +++ b/test/test.consumerGroup.js @@ -145,6 +145,148 @@ describe('ConsumerGroup', function () { }); }); + describe('Topic partition change detection', function () { + let ConsumerGroup = null; + let consumerGroup = null; + let sandbox = null; + + const fakeClient = new EventEmitter(); + fakeClient.loadMetadataForTopics = function () {}; + + const FakeClient = function () { + return fakeClient; + }; + + beforeEach(function () { + sandbox = sinon.sandbox.create(); + ConsumerGroup = proxyquire('../lib/consumerGroup', { + './client': FakeClient + }); + + consumerGroup = new ConsumerGroup( + { + host: 'gibberish', + connectOnReady: false + }, + 'TestTopic' + ); + }); + + afterEach(function () { + sandbox.restore(); + }); + + describe('#scheduleTopicPartitionCheck', function () { + let clock; + beforeEach(function () { + clock = sandbox.useFakeTimers(); + }); + + it('should only have one schedule pending', function () { + const cgMock = sandbox.mock(consumerGroup); + consumerGroup.isLeader = true; + cgMock + .expects('_checkTopicPartitionChange') + .once() + .yields(null, true); + cgMock.expects('commit').never(); + cgMock + .expects('leaveGroup') + .once() + .yields(null); + cgMock.expects('connect').once(); + + consumerGroup.scheduleTopicPartitionCheck(); + consumerGroup.scheduleTopicPartitionCheck(); + + clock.tick(30000); + + cgMock.verify(); + }); + + it('should only schedule a check if consumer is a leader', function () { + const cgMock = sandbox.mock(consumerGroup); + + consumerGroup.isLeader = false; + + cgMock.expects('_checkTopicPartitionChange').never(); + cgMock.expects('leaveGroup').never(); + cgMock.expects('connect').never(); + cgMock.expects('commit').never(); + + consumerGroup.scheduleTopicPartitionCheck(); + clock.tick(30000); + cgMock.verify(); + }); + }); + + describe('#_checkTopicPartitionChange', function () { + it('should yield false when the topic/partition length are the same', function (done) { + sandbox.stub(consumerGroup.client, 'loadMetadataForTopics').yields(null, [ + 0, + { + metadata: { + aTopic: { + '0': {}, + '1': {} + }, + existingTopic: { + '0': {}, + '1': {}, + '2': {} + } + } + } + ]); + + consumerGroup.topicPartitionLength = { + aTopic: 2, + existingTopic: 3 + }; + + consumerGroup.topics = ['aTopic', 'existingTopic']; + + consumerGroup._checkTopicPartitionChange(function (error, changed) { + sinon.assert.calledOnce(consumerGroup.client.loadMetadataForTopics); + should(changed).be.false; + done(error); + }); + }); + + it('should yield true when the topic/partition length are different', function (done) { + sandbox.stub(consumerGroup.client, 'loadMetadataForTopics').yields(null, [ + 0, + { + metadata: { + nonExistantTopic: { + '0': {}, + '1': {} + }, + existingTopic: { + '0': {}, + '1': {}, + '2': {} + } + } + } + ]); + + consumerGroup.topicPartitionLength = { + nonExistantTopic: 0, + existingTopic: 3 + }; + + consumerGroup.topics = ['nonExistantTopic', 'existingTopic']; + + consumerGroup._checkTopicPartitionChange(function (error, changed) { + sinon.assert.calledOnce(consumerGroup.client.loadMetadataForTopics); + should(changed).be.true; + done(error); + }); + }); + }); + }); + describe('Broker offline recovery', function () { let sandbox = null; let consumerGroup = null; @@ -1058,10 +1200,13 @@ describe('ConsumerGroup', function () { topic = uuid.v4(); newTopic = uuid.v4(); testMessage = uuid.v4(); - consumerGroup = new ConsumerGroup({ - kafkaHost: host + ':9092', - groupId: uuid.v4() - }, topic); + consumerGroup = new ConsumerGroup( + { + kafkaHost: host + ':9092', + groupId: uuid.v4() + }, + topic + ); consumerGroup.once('connect', () => { consumerGroup.client.createTopics([topic, newTopic], done); }); @@ -1107,10 +1252,13 @@ describe('ConsumerGroup', function () { topic = uuid.v4(); newTopic = uuid.v4(); testMessage = uuid.v4(); - consumerGroup = new ConsumerGroup({ - kafkaHost: host + ':9092', - groupId: uuid.v4() - }, [topic, newTopic]); + consumerGroup = new ConsumerGroup( + { + kafkaHost: host + ':9092', + groupId: uuid.v4() + }, + [topic, newTopic] + ); consumerGroup.once('connect', () => { consumerGroup.client.createTopics([topic, newTopic], done); });