Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create topics support #958

Merged
merged 13 commits into from
Jun 10, 2018
33 changes: 19 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,29 +183,34 @@ producer.on('error', function (err) {})
```
> ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise you may lose messages!

### createTopics(topics, async, cb)
This method is used to create topics on the Kafka server. It only works when `auto.create.topics.enable`, on the Kafka server, is set to true. Our client simply sends a metadata request to the server which will auto create topics. When `async` is set to false, this method does not return until all topics are created, otherwise it returns immediately.
### createTopics(topics, cb)
This method is used to create topics on the Kafka server. It requires Kafka 0.10+.

* `topics`: **Array**, array of topics
* `async`: **Boolean**, async or sync
* `cb`: **Function**, the callback

Example:

``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
var kafka = require('kafka-node');
var client = new kafka.KafkaClient();

var topicsToCreate = [{
topic: 'topic1',
partitions: 1,
replicationFactor: 2
},
{
topic: 'topic2',
partitions: 5,
replicationFactor: 3
}];

client.createTopics(topics, (error, result) => {
// result is an array of any errors if a given topic could not be created
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
```

```

## HighLevelProducer
### HighLevelProducer(client, [options], [customPartitioner])
Expand Down
8 changes: 8 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ Admin.prototype.describeGroups = function (consumerGroups, cb) {
this.client.getDescribeGroups(consumerGroups, cb);
};

Admin.prototype.createTopics = function (topics, cb) {
if (!this.ready) {
this.once('ready', () => this.client.createTopics(topics, cb));
return;
}
this.client.createTopics(topics, cb);
};

module.exports = Admin;
18 changes: 18 additions & 0 deletions lib/errors/NotControllerError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var util = require('util');

/**
* The request was sent to a broker that was not the controller.
*
* @param {*} message A message describing the issue.
*
* @constructor
*/
var NotController = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
};

util.inherits(NotController, Error);
NotController.prototype.name = 'NotController';

module.exports = NotController;
177 changes: 177 additions & 0 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const BrokerWrapper = require('./wrapper/BrokerWrapper');
const errors = require('./errors');
const validateConfig = require('./utils').validateConfig;
const TimeoutError = require('./errors/TimeoutError');
const NotControllerError = require('./errors/NotControllerError');
const protocol = require('./protocol');
const protocolVersions = require('./protocol/protocolVersions');
const baseProtocolVersions = protocolVersions.baseSupport;
Expand Down Expand Up @@ -67,6 +68,7 @@ const KafkaClient = function (options) {
this._socketId = 0;
this.cbqueue = {};
this.brokerMetadata = {};
this.clusterMetadata = {};
this.ready = false;

this.initialHosts = parseHostList(this.options.kafkaHost);
Expand Down Expand Up @@ -241,6 +243,48 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) {
}, timeout);
};

KafkaClient.prototype.getController = function (callback) {
// Check for cached controller
if (this.clusterMetadata.controllerId) {
var controller = this.brokerMetadata[this.clusterMetadata.controllerId];
var broker = this.getBroker(controller.host, controller.port);

return callback(null, broker);
}

// If cached controller is not available, refresh metadata
this.loadMetadata((error, result) => {
if (error) {
return callback(error);
}

// No controller will be available if api version request timed out, or if kafka version is less than 0.10.
if (!result[1].clusterMetadata || !result[1].clusterMetadata.controllerId) {
return callback(new errors.BrokerNotAvailableError('Controller broker not available'));
}

this.updateMetadatas(result);

var controllerId = result[1].clusterMetadata.controllerId;
var controllerMetadata = result[0][controllerId];

var broker = this.getBroker(controllerMetadata.host, controllerMetadata.port);

if (!broker || !broker.isConnected()) {
return callback(new errors.BrokerNotAvailableError('Controller broker not available'));
}

return callback(null, broker);
});
};

KafkaClient.prototype.getBroker = function (host, port, longpolling) {
const brokers = this.getBrokers();

var addr = host + ':' + port;
return brokers[addr] || this.setupBroker(host, port, longpolling, brokers);
};

KafkaClient.prototype.setupBroker = function (host, port, longpolling, brokers) {
var brokerKey = host + ':' + port;
brokers[brokerKey] = this.createBroker(host, port, longpolling);
Expand Down Expand Up @@ -346,6 +390,22 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) {
}
};

KafkaClient.prototype.setClusterMetadata = function (clusterMetadata) {
assert(clusterMetadata, 'clusterMetadata is empty');
this.clusterMetadata = clusterMetadata;
};

KafkaClient.prototype.setControllerId = function (controllerId) {
if (!this.clusterMetadata) {
this.clusterMetadata = {
controllerId
};

return;
}
this.clusterMetadata.controllerId = controllerId;
};

KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadata) {
assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect');
logger.debug('updating metadatas');
Expand All @@ -355,6 +415,10 @@ KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadat
} else {
_.extend(this.topicMetadata, metadatas[1].metadata);
}

if (metadatas[1].clusterMetadata) {
this.setClusterMetadata(metadatas[1].clusterMetadata);
}
};

KafkaClient.prototype.brokerForLeader = function (leader, longpolling) {
Expand Down Expand Up @@ -693,6 +757,74 @@ KafkaClient.prototype.clearCallbackQueue = function (socket, error) {
delete this.cbqueue[socketId];
};

/**
* Fetches metadata for brokers and cluster.
* This includes an array containing each node (id, host and port).
* Depending on kafka version, additional cluster information is available (controller id).
* @param {loadMetadataCallback} cb Function to call once metadata is loaded.
*/
KafkaClient.prototype.loadMetadata = function (callback) {
this.loadMetadataForTopics(null, callback);
};

/**
* Fetches metadata for brokers and cluster.
* This includes an array containing each node (id, host and port). As well as an object
* containing the topic name, partition, leader number, replica count, and in sync replicas per partition.
* Depending on kafka version, additional cluster information is available (controller id).
* @param {Array} topics List of topics to fetch metadata for. An empty array ([]) will fetch all topics.
* @param {loadMetadataCallback} callback Function to call once metadata is loaded.
*/
KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) {
var broker = this.brokerForLeader();

if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return callback(new errors.BrokerNotAvailableError('Broker not available'));
}

const ensureBrokerReady = (broker, cb) => {
if (!broker.isReady()) {
logger.debug('missing apiSupport waiting until broker is ready...');
this.waitUntilReady(broker, cb);
} else {
cb(null);
}
};

async.series([
cb => {
ensureBrokerReady(broker, cb);
},
cb => {
var correlationId = this.nextId();
var supportedCoders = getSupportedForRequestType(broker, 'metadata');
var request = supportedCoders.encoder(this.clientId, correlationId, topics);

this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
broker.write(request);
}
], (err, result) => {
callback(err, result[1]);
});
};

/**
* Creates one or more topics.
* @param {Array} topics Array of topics with partition and replication factor to create.
* @param {createTopicsCallback} callback Function to call once operation is completed.
*/
KafkaClient.prototype.createTopics = function (topics, callback) {
// Calls with [string, string, ...] are forwarded to support previous versions
if (topics.every(t => typeof t === 'string')) {
return Client.prototype.createTopics.apply(this, arguments);
}

const encoder = protocol.encodeCreateTopicRequest;
const decoder = protocol.decodeCreateTopicResponse;

this.sendControllerRequest(encoder, decoder, [topics, this.options.requestTimeout], callback);
};

KafkaClient.prototype.topicExists = function (topics, callback) {
this.loadMetadataForTopics([], (error, response) => {
if (error) {
Expand Down Expand Up @@ -870,6 +1002,51 @@ KafkaClient.prototype.verifyPayloadsHasLeaders = function (payloads, callback) {
});
};

KafkaClient.prototype.wrapControllerCheckIfNeeded = function (encoder, decoder, encoderArgs, callback) {
if (callback.isControllerWrapper) {
return callback;
}

var hasBeenInvoked = false;

const wrappedCallback = (error, result) => {
if (error instanceof NotControllerError) {
this.setControllerId(null);

if (!hasBeenInvoked) {
hasBeenInvoked = true;
this.sendControllerRequest(encoder, decoder, encoderArgs, wrappedCallback);
return;
}
}

callback(error, result);
};

wrappedCallback.isControllerWrapper = true;

return wrappedCallback;
};

KafkaClient.prototype.sendControllerRequest = function (encoder, decoder, encoderArgs, callback) {
this.getController((error, controller) => {
if (error) {
return callback(error);
}

const originalArgs = _.clone(encoderArgs);
const originalCallback = callback;
const correlationId = this.nextId();
encoderArgs.unshift(this.clientId, correlationId);
const request = encoder.apply(null, encoderArgs);

callback = this.wrapControllerCheckIfNeeded(encoder, decoder, originalArgs, originalCallback);

this.queueCallback(controller.socket, correlationId, [decoder, callback]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @jlandersen thanks for the update. Could you help me understand why wrapControllerCheckIfNeeded is needed? Would it not be simpler to just inline a function in place of the callback that checks if the error is an instance of NotControllerError and then set the controller Id to null and call sendControllerRequest again if it is?

Copy link
Contributor Author

@jlandersen jlandersen Jun 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason was to limit the retry to a single time (or potentially x times). If we add a function in place of the callback that always calls the sendControllerRequest on a NotControllerError, it could keep making requests if the error is returned repeatedly. I am not saying it can or will happen, but was rather to be safe. Let me know if you prefer otherwise and i'll put the change in!

controller.write(request);
});
};

KafkaClient.prototype.sendFetchRequest = function (
consumer,
payloads,
Expand Down
Loading