Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer group leader now periodically checks for new topics/partitio… #1057

Merged
merged 1 commit into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 118 additions & 34 deletions lib/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const ConsumerGroupRecovery = require('./consumerGroupRecovery');
const Heartbeat = require('./consumerGroupHeartbeat');
const createTopicPartitionList = require('./utils').createTopicPartitionList;
const errors = require('./errors');
const NestedError = require('nested-error-stacks');

const assert = require('assert');
const builtInProtocols = require('./assignment');
Expand Down Expand Up @@ -49,6 +50,7 @@ const DEFAULTS = {
migrateHLC: false,
migrateRolling: true,
onRebalance: null,
topicPartitionCheckInterval: 30000,
protocol: ['roundrobin']
};

Expand Down Expand Up @@ -260,7 +262,11 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal
}

var self = this;
var topics = _(groupMembers).map('subscription').flatten().uniq().value();
var topics = _(groupMembers)
.map('subscription')
.flatten()
.uniq()
.value();

async.waterfall(
[
Expand All @@ -271,6 +277,9 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal

function (metadataResponse, callback) {
var metadata = mapTopicToPartitions(metadataResponse[1].metadata);

self.topicPartitionLength = createTopicPartitionLength(metadata, _.difference(topics, Object.keys(metadata)));

logger.debug('mapTopicToPartitions', metadata);
protocol.assign(metadata, groupMembers, callback);
}
Expand All @@ -279,6 +288,68 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal
);
};

function createTopicPartitionLength (metadata, emptyTopics) {
const topicPartitionLength = {};
_.forOwn(metadata, function (value, key) {
topicPartitionLength[key] = value.length;
});

for (const topic of emptyTopics) {
if (topic in topicPartitionLength) {
throw new Error(`Topic ${topic} is not empty`);
}
topicPartitionLength[topic] = 0;
}
return topicPartitionLength;
}

ConsumerGroup.prototype.scheduleTopicPartitionCheck = function () {
if (this.isLeader && !this.topicPartitionCheckTimer) {
logger.debug(`${this.client.clientId} is leader scheduled new topic/partition check`);
this.topicPartitionCheckTimer = setTimeout(() => {
this.topicPartitionCheckTimer = null;
logger.debug('checking for new topics and partitions');
this._checkTopicPartitionChange((error, changed) => {
if (error) {
return this.emit('error', new NestedError('topic/partition change check failed', error));
}

if (changed) {
logger.debug('Topic/Partitions has changed');
async.series([
callback =>
this.options.autoCommit && this.generationId != null && this.memberId
? this.commit(true, callback)
: callback(null),
callback => this.leaveGroup(callback),
callback => {
this.connect();
callback(null);
}
]);
} else {
logger.debug('no new Topic/Partitions');
this.scheduleTopicPartitionCheck();
}
});
}, this.options.topicPartitionCheckInterval);
}
};

ConsumerGroup.prototype._checkTopicPartitionChange = function (callback) {
this.client.loadMetadataForTopics(this.topics, (error, metadataResponse) => {
if (error) {
return callback(error);
}
const metadata = mapTopicToPartitions(metadataResponse[1].metadata);

const topicOrPartitionsChanged = _.some(this.topicPartitionLength, function (numberOfPartitions, topic) {
return numberOfPartitions !== _.get(metadata, `[${topic}].length`, 0);
});
callback(null, topicOrPartitionsChanged);
});
};

function mapTopicToPartitions (metadata) {
return _.mapValues(metadata, Object.keys);
}
Expand All @@ -304,10 +375,12 @@ ConsumerGroup.prototype.handleJoinGroup = function (joinGroupResponse, callback)

ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callback) {
var self = this;
const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => {
tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset];
return tp;
});
const offsetPayload = _(topicPartitionList)
.cloneDeep()
.map(tp => {
tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset];
return tp;
});

self.getOffset().fetch(offsetPayload, function (error, result) {
if (error) {
Expand Down Expand Up @@ -349,8 +422,9 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
if (self.options.fromOffset === 'none') {
return callback(
new Error(
`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self
.options.groupId}'`
`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${
self.options.groupId
}'`
)
);
}
Expand Down Expand Up @@ -402,7 +476,7 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
return p;
});
if (noOffset && self.options.commitOffsetsOnFirstJoin) {
self.commit(true, (err) => {
self.commit(true, err => {
callback(err, !err ? true : null);
});
} else {
Expand Down Expand Up @@ -537,6 +611,7 @@ ConsumerGroup.prototype.connect = function () {
self.clearPendingFetches();
self.fetch();
}
self.scheduleTopicPartitionCheck();
self.startHeartbeats();
self.emit('connect');
self.emit('rebalanced');
Expand Down Expand Up @@ -645,44 +720,53 @@ ConsumerGroup.prototype.addTopics = function (topics, cb) {
return;
}

async.series([
callback => this.client.topicExists(topics, callback),
callback => (this.options.autoCommit && this.generationId != null && this.memberId)
? this.commit(true, callback)
: callback(null),
callback => this.leaveGroup(callback),
callback => {
this.topics = this.topics.concat(topics);
this.setupProtocols(this.options.protocol);
this.connect();
callback(null);
}
], error => error ? cb(error) : cb(null, `Add Topics ${topics.join(',')} Successfully`));
async.series(
[
callback => this.client.topicExists(topics, callback),
callback =>
this.options.autoCommit && this.generationId != null && this.memberId
? this.commit(true, callback)
: callback(null),
callback => this.leaveGroup(callback),
callback => {
this.topics = this.topics.concat(topics);
this.setupProtocols(this.options.protocol);
this.connect();
callback(null);
}
],
error => (error ? cb(error) : cb(null, `Add Topics ${topics.join(',')} Successfully`))
);
};

ConsumerGroup.prototype.removeTopics = function (topics, cb) {
topics = typeof topics === 'string' ? [topics] : topics;

async.series([
callback => this.client.topicExists(topics, callback),
callback => (this.options.autoCommit && this.generationId != null && this.memberId)
? this.commit(true, callback)
: callback(null),
callback => this.leaveGroup(callback),
callback => {
this.topics = _.difference(this.topics, topics);
this.setupProtocols(this.options.protocol);
this.connect();
callback(null);
}
], error => error ? cb(error) : cb(null, `Remove Topics ${topics.join(',')} Successfully`));
async.series(
[
callback => this.client.topicExists(topics, callback),
callback =>
this.options.autoCommit && this.generationId != null && this.memberId
? this.commit(true, callback)
: callback(null),
callback => this.leaveGroup(callback),
callback => {
this.topics = _.difference(this.topics, topics);
this.setupProtocols(this.options.protocol);
this.connect();
callback(null);
}
],
error => (error ? cb(error) : cb(null, `Remove Topics ${topics.join(',')} Successfully`))
);
};

ConsumerGroup.prototype.close = function (force, cb) {
var self = this;
this.ready = false;

this.stopHeartbeats();
clearTimeout(this.topicPartitionCheckTimer);

if (typeof force === 'function') {
cb = force;
Expand Down
Loading