Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix create topics sync vs async #519

Merged
merged 2 commits into from
Nov 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,48 +357,53 @@ Client.prototype.createTopics = function (topics, isAsync, cb) {
cb = isAsync;
isAsync = true;
}

try {
validateKafkaTopics(topics);
} catch (e) {
if (isAsync) return cb(e);
throw e;
}
var self = this;
// first, load metadata to create topics
this.loadMetadataForTopics(topics, function (err, resp) {
if (err) return cb(err);
if (isAsync) return cb(null, 'All requests sent');
var topicMetadata = resp[1].metadata;
// ommit existed topics
var existed = Object.keys(topicMetadata);
var topicsNotExists = topics.filter(function (topic) {
return !~existed.indexOf(topic);

cb = _.once(cb);

const getTopicsFromKafka = (topics, callback) => {
this.loadMetadataForTopics(topics, function (error, resp) {
if (error) {
return callback(error);
}
callback(null, Object.keys(resp[1].metadata));
});
};

function attemptCreateTopics (topics, cb) {
var operation = retry.operation({ minTimeout: 200, maxTimeout: 2000 });
operation.attempt(function (currentAttempt) {
debug('create topics currentAttempt', currentAttempt);
self.loadMetadataForTopics(topics, function (err, resp) {
if (resp) {
var topicMetadata = resp[1].metadata;
var created = Object.keys(topicMetadata).length === topics.length;
if (!created) err = new Error('Topic creation pending');
}
if (operation.retry(err)) {
return;
}

cb(err, 'All created');
});
});
}
const operation = retry.operation({ minTimeout: 200, maxTimeout: 2000 });

if (!topicsNotExists.length) return cb(null, 'All created');
operation.attempt(currentAttempt => {
debug('create topics currentAttempt', currentAttempt);
getTopicsFromKafka(topics, function (error, kafkaTopics) {
if (error) {
if (operation.retry(error)) {
return;
}
}

debug('kafka reported topics', kafkaTopics);
const left = _.difference(topics, kafkaTopics);
if (left.length === 0) {
debug(`Topics created ${kafkaTopics}`);
return cb(null);
}

debug('create topic by sending metadata request');
attemptCreateTopics(topicsNotExists, cb);
debug(`Topics left ${left.join(', ')}`);
if (!operation.retry(new Error(`Topics not created ${left}`))) {
cb(operation.mainError());
}
});
});

if (!isAsync) {
cb(null);
}
};

/**
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
source start-docker.sh
export KAFKA_TEST_HOST=$DOCKER_VM_IP
echo "KAFKA_TEST_HOST: $KAFKA_TEST_HOST"
./node_modules/.bin/istanbul cover _mocha -- -t 15000 test/**/test.*js test/test.*js
./node_modules/.bin/istanbul cover _mocha -- -t 20000 test/**/test.*js test/test.*js
58 changes: 58 additions & 0 deletions test/test.client.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
'use strict';

var host = process.env['KAFKA_TEST_HOST'] || '';
var kafka = require('..');
var Client = kafka.Client;
Expand All @@ -9,6 +11,8 @@ var InvalidConfigError = require('../lib/errors/InvalidConfigError');
var proxyquire = require('proxyquire').noCallThru();
var sinon = require('sinon');
var retry = require('retry');
const _ = require('lodash');
const async = require('async');

describe('Client', function () {
var client = null;
Expand Down Expand Up @@ -368,6 +372,60 @@ describe('Client', function () {
});
});

describe('#createTopics', function () {
function verifyTopics (topics, callback) {
async.each(topics, function (topic, callback) {
client.zk.topicExists(topic, function (error, exists, topic) {
if (error) {
return callback(error);
}
exists.should.be.true;
callback();
});
}, callback);
}

it('should create given kafka topics', function (done) {
const topics = _.times(3, uuid.v4);
client.createTopics(topics, true, function (error) {
if (error) {
return done(error);
}
verifyTopics(topics, done);
});
});

it('should yield synchronously', function (done) {
let called = false;
const topics = _.times(3, uuid.v4);
client.createTopics(topics, false, function (error) {
if (error) {
return done(error);
}
if (!called) {
return done();
}
done('Called asynchronously');
});
called = true;
});

it('should yield asynchronously', function (done) {
let called = false;
const topics = _.times(3, uuid.v4);
client.createTopics(topics, true, function (error) {
if (error) {
return done(error);
}
if (called) {
return done();
}
done('Called synchronously');
});
called = true;
});
});

describe('#reconnectBroker', function () {
var emptyFn = function () {};

Expand Down
17 changes: 6 additions & 11 deletions test/test.consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,13 @@ describe('Consumer', function () {
EXISTS_TOPIC_2,
EXISTS_GZIP,
EXISTS_SNAPPY
], false, function (err, created) {
], true, function (err) {
if (err) return done(err);

function useNewTopics () {
producer.send([
{ topic: EXISTS_TOPIC_2, messages: 'hello kafka' },
{ topic: EXISTS_GZIP, messages: 'hello gzip', attributes: 1 },
{ topic: EXISTS_SNAPPY, messages: SNAPPY_MESSAGE, attributes: 2 }
], done);
}
// Ensure leader selection happened
setTimeout(useNewTopics, 1000);
producer.send([
{ topic: EXISTS_TOPIC_2, messages: 'hello kafka' },
{ topic: EXISTS_GZIP, messages: 'hello gzip', attributes: 1 },
{ topic: EXISTS_SNAPPY, messages: SNAPPY_MESSAGE, attributes: 2 }
], done);
});
});
});
Expand Down
44 changes: 2 additions & 42 deletions test/test.highlevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ var client, producer, noAckProducer, producerKeyed;

var host = process.env['KAFKA_TEST_HOST'] || '';

// Helper method
function randomId () {
return Math.floor(Math.random() * 10000);
}

[
{
name: 'PLAINTEXT HighLevelProducer'
Expand Down Expand Up @@ -45,9 +40,9 @@ function randomId () {

producer.on('ready', function () {
producerKeyed.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_3], false, function (err, created) {
producer.createTopics([EXISTS_TOPIC_3], true, function (err) {
if (err) return done(err);
setTimeout(done, 500);
done();
});
});
});
Expand Down Expand Up @@ -151,41 +146,6 @@ function randomId () {
});
});

describe('#createTopics', function () {
var client, producer;

before(function (done) {
client = new Client(host);
producer = new HighLevelProducer(client);
producer.on('ready', done);
});

after(function (done) {
producer.close(done);
});

it('should return All requests sent when async is true', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], true, function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('async should be true if not present', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('should return All created when async is false', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], false, function (err, data) {
data.should.equal('All created');
done(err);
});
});
});

describe('#close', function () {
var client, producer;

Expand Down
2 changes: 1 addition & 1 deletion test/test.offset.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('Offset', function () {
client = new Client(host);
producer = new Producer(client);
producer.on('ready', function () {
producer.createTopics(['_exist_topic_3_test'], false, function (err, created) {
producer.createTopics(['_exist_topic_3_test'], true, function (err) {
done(err);
});
});
Expand Down
44 changes: 2 additions & 42 deletions test/test.producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ var client, producer, noAckProducer, producerKeyed;

var host = process.env['KAFKA_TEST_HOST'] || '';

// Helper method
function randomId () {
return Math.floor(Math.random() * 10000);
}

[
{
name: 'PLAINTEXT Producer'
Expand Down Expand Up @@ -45,9 +40,9 @@ function randomId () {

producer.on('ready', function () {
producerKeyed.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_3], false, function (err, created) {
producer.createTopics([EXISTS_TOPIC_3], true, function (err) {
if (err) return done(err);
setTimeout(done, 500);
done();
});
});
});
Expand Down Expand Up @@ -198,41 +193,6 @@ function randomId () {
});
});

describe('#createTopics', function () {
var client, producer;

before(function (done) {
client = new Client(host);
producer = new Producer(client);
producer.on('ready', done);
});

after(function (done) {
producer.close(done);
});

it('should return All requests sent when async is true', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], true, function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('async should be true if not present', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('should return All created when async is false', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], false, function (err, data) {
data.should.equal('All created');
done(err);
});
});
});

describe('#close', function () {
var client, producer;

Expand Down
4 changes: 2 additions & 2 deletions test/test.producerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ describe('No Ack Producer', function () {
producer = new Producer(client);
batchProducer = new Producer(batchClient);
producer.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_4], false, function (err, created) {
producer.createTopics([EXISTS_TOPIC_4], true, function (err) {
if (err) return callback(err);
setTimeout(callback, 500);
callback();
});
broker = Object.keys(client.brokers)[0];
});
Expand Down