Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HLC rebalance issues and add tests #423

Merged
merged 10 commits into from
Aug 5, 2016
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
KAFKA_SSL_KEY_PASSWORD: "password"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/var/private/ssl/certs/server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "password"
KAFKA_CREATE_TOPICS: "RebalanceTopic:3:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./docker/certs:/var/private/ssl/certs
141 changes: 86 additions & 55 deletions lib/highLevelConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var HighLevelConsumer = function (client, topics, options) {
this.options = _.defaults((options || {}), DEFAULTS);
this.initialised = false;
this.ready = false;
this.closing = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
Expand Down Expand Up @@ -133,7 +134,8 @@ HighLevelConsumer.prototype.connect = function () {
}

function rebalance () {
if (!self.rebalancing) {
debug('rebalance() %s is rebalancing: %s ready: %s', self.id, self.rebalancing, self.ready);
if (!self.rebalancing && !self.closing) {
deregister();

self.emit('rebalancing');
Expand Down Expand Up @@ -166,7 +168,10 @@ HighLevelConsumer.prototype.connect = function () {
if (err) {
self.rebalancing = false;
self.emit('error', err);
} else {
return;
}

if (self.topicPayloads.length) {
fetchAndUpdateOffsets(function (err) {
self.rebalancing = false;
if (err) {
Expand All @@ -176,6 +181,9 @@ HighLevelConsumer.prototype.connect = function () {
self.fetch();
self.emit('rebalanced');
});
} else { // was not assigned any partitions during rebalance
self.rebalancing = false;
self.emit('rebalanced');
}
});
}
Expand All @@ -190,14 +198,14 @@ HighLevelConsumer.prototype.connect = function () {
});

function register () {
debug('Registered listeners');
debug('Registered listeners %s', self.id);
// Register for re-balances (broker or consumer changes)
self.client.zk.on('consumersChanged', rebalance);
self.client.on('brokersChanged', rebalance);
}

function deregister () {
debug('Deregistered listeners');
debug('Deregistered listeners %s', self.id);
// Register for re-balances (broker or consumer changes)
self.client.zk.removeListener('consumersChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
Expand Down Expand Up @@ -259,6 +267,38 @@ HighLevelConsumer.prototype.connect = function () {
});
};

HighLevelConsumer.prototype._releasePartitions = function (topicPayloads, callback) {
var self = this;
async.each(topicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
async.series([
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
// Partition doesn't exist simply carry on
cbb();
} else delcbb();
});
},
function (delcbb) {
self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, delcbb);
},
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb();
} else {
delcbb('Partition should not exist');
}
});
}],
cbb);
} else {
cbb();
}
}, callback);
};

HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
var self = this;
// Do the rebalance.....
Expand Down Expand Up @@ -291,52 +331,13 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
// Release current partitions
function (callback) {
debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id);
async.eachSeries(oldTopicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
async.series([
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
// Partition doesn't exist simply carry on
cbb();
} else delcbb();
});
},
function (delcbb) {
self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb(err);
} else delcbb();
});
},
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb();
} else {
delcbb('Partition should not exist');
}
});
}],
function (err) {
if (err) cbb(err);
else cbb();
});
} else {
cbb();
}
}, function (err) {
if (err) {
callback(err);
} else {
callback();
}
});
self._releasePartitions(oldTopicPayloads, callback);
},

// Reblannce
function (callback) {
debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id);
debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap);
for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) {
var topicToAdd = consumerPerTopicMap.consumerTopicMap[self.id][topic];
var numberOfConsumers = consumerPerTopicMap.topicConsumerMap[topicToAdd].length;
Expand Down Expand Up @@ -367,6 +368,7 @@ HighLevelConsumer.prototype.rebalanceAttempt = function (oldTopicPayloads, cb) {
});
}
}
debug('newTopicPayloads %j', newTopicPayloads);
callback();
},

Expand Down Expand Up @@ -500,6 +502,7 @@ HighLevelConsumer.prototype.fetch = function () {
};

HighLevelConsumer.prototype.fetchOffset = function (payloads, cb) {
debug('in fetchOffset %s payloads: %j', this.id, payloads);
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
};

Expand All @@ -515,11 +518,12 @@ HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) {
* @param {Client~failedToRegisterConsumerCallback} cb A function to call the consumer has been registered
*/
HighLevelConsumer.prototype.registerConsumer = function (cb) {
var self = this;
this.client.zk.registerConsumer(this.options.groupId, this.id, this.payloads, function (err) {
if (err) return cb(err);
self.client.zk.listConsumers(self.options.groupId);
cb();
});
this.client.zk.listConsumers(this.options.groupId);
};

HighLevelConsumer.prototype.addTopics = function (topics, cb) {
Expand Down Expand Up @@ -561,25 +565,52 @@ HighLevelConsumer.prototype.removeTopics = function (topics, cb) {
this.client.removeTopicMetadata(topics, cb);
};

HighLevelConsumer.prototype._leaveGroup = function (cb) {
var self = this;
async.parallel([
function (callback) {
if (self.topicPayloads.length) {
self._releasePartitions(self.topicPayloads, callback);
} else {
callback(null);
}
},
function (callback) {
self.client.zk.unregisterConsumer(self.options.groupId, self.id, callback);
}
], cb);
};

HighLevelConsumer.prototype.close = function (force, cb) {
var self = this;
this.ready = false;
this.closing = true;
clearInterval(this.checkPartitionOwnershipInterval);

if (typeof force === 'function') {
cb = force;
force = false;
}

if (force) {
this.commit(force, function (err) {
if (err) {
return cb(err);
async.series([
function (callback) {
self._leaveGroup(callback);
},
function (callback) {
if (force) {
async.series([
function (callback) {
self.commit(true, callback);
},
function (callback) {
self.client.close(callback);
}
], callback);
return;
}
this.client.close(cb);
}.bind(this));
} else {
this.client.close(cb);
}
self.client.close(callback);
}
], cb);
};

HighLevelConsumer.prototype.stop = function (cb) {
Expand Down
Loading