Skip to content

Commit

Permalink
Add create topics support (#958)
Browse files Browse the repository at this point in the history
* Add create topics support

Adds support for sending a create topics request as a real protocol request. This supports specifying number of partitions and replication factor.
Explicit replica assignments and config entries are left out for now.

A create topic request must be sent to the controller. To find the current controller, this also adds a version 1 metadata request, which includes the controller Id in the response.

* Handle different error message in kafka versions

Error message changes a bit depending on version. This relaxes the assertion a bit.

* Remove metadata api support version

metadata v1 is not supported until 0.10

* Return error if controller cannot be found

Happens when cluster is is running version less than 0.10

* Parse partition metadata correctly for v1

The order resulted in isInternal being used as the number of partitions during decoding.

* Address PR feedback

- Add createTopics alias on admin client
- Remove V2 suffix, replacing old versions of createTopics and loadMetadataForTopics on KafkaClient
- Add loadMetadata function
- Use supported API to resolve metadata request version

* Encode null value properly

* Forward old createTopics calls to previous impl

* Update README with new createTopics function

* Update cached metadata in getController

* Remove unused sandbox

* Add additional assertion of createTopic test

Now fetches metadata again to verify topics were created with requested partition count and replication factor.

* Add sendControllerRequest method

Wraps the provided callback with retry logic if NotController error was returned.
In this case the cached controller is cleared and it is fetched again by getting the latest metadata.
  • Loading branch information
jlandersen authored and hyperlink committed Jun 10, 2018
1 parent eff1e60 commit a3653ba
Show file tree
Hide file tree
Showing 9 changed files with 624 additions and 20 deletions.
33 changes: 19 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,29 +183,34 @@ producer.on('error', function (err) {})
```
> ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise you may lose messages!
### createTopics(topics, async, cb)
This method is used to create topics on the Kafka server. It only works when `auto.create.topics.enable`, on the Kafka server, is set to true. Our client simply sends a metadata request to the server which will auto create topics. When `async` is set to false, this method does not return until all topics are created, otherwise it returns immediately.
### createTopics(topics, cb)
This method is used to create topics on the Kafka server. It requires Kafka 0.10+.

* `topics`: **Array**, array of topics
* `async`: **Boolean**, async or sync
* `cb`: **Function**, the callback

Example:

``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
var kafka = require('kafka-node');
var client = new kafka.KafkaClient();

var topicsToCreate = [{
topic: 'topic1',
partitions: 1,
replicationFactor: 2
},
{
topic: 'topic2',
partitions: 5,
replicationFactor: 3
}];

client.createTopics(topics, (error, result) => {
// result is an array of any errors if a given topic could not be created
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
```

```

## HighLevelProducer
### HighLevelProducer(client, [options], [customPartitioner])
Expand Down
8 changes: 8 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ Admin.prototype.describeGroups = function (consumerGroups, cb) {
this.client.getDescribeGroups(consumerGroups, cb);
};

Admin.prototype.createTopics = function (topics, cb) {
if (!this.ready) {
this.once('ready', () => this.client.createTopics(topics, cb));
return;
}
this.client.createTopics(topics, cb);
};

module.exports = Admin;
18 changes: 18 additions & 0 deletions lib/errors/NotControllerError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var util = require('util');

/**
* The request was sent to a broker that was not the controller.
*
* @param {*} message A message describing the issue.
*
* @constructor
*/
var NotController = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
};

util.inherits(NotController, Error);
NotController.prototype.name = 'NotController';

module.exports = NotController;
177 changes: 177 additions & 0 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const BrokerWrapper = require('./wrapper/BrokerWrapper');
const errors = require('./errors');
const validateConfig = require('./utils').validateConfig;
const TimeoutError = require('./errors/TimeoutError');
const NotControllerError = require('./errors/NotControllerError');
const protocol = require('./protocol');
const protocolVersions = require('./protocol/protocolVersions');
const baseProtocolVersions = protocolVersions.baseSupport;
Expand Down Expand Up @@ -67,6 +68,7 @@ const KafkaClient = function (options) {
this._socketId = 0;
this.cbqueue = {};
this.brokerMetadata = {};
this.clusterMetadata = {};
this.ready = false;

this.initialHosts = parseHostList(this.options.kafkaHost);
Expand Down Expand Up @@ -241,6 +243,48 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) {
}, timeout);
};

KafkaClient.prototype.getController = function (callback) {
// Check for cached controller
if (this.clusterMetadata.controllerId) {
var controller = this.brokerMetadata[this.clusterMetadata.controllerId];
var broker = this.getBroker(controller.host, controller.port);

return callback(null, broker);
}

// If cached controller is not available, refresh metadata
this.loadMetadata((error, result) => {
if (error) {
return callback(error);
}

// No controller will be available if api version request timed out, or if kafka version is less than 0.10.
if (!result[1].clusterMetadata || !result[1].clusterMetadata.controllerId) {
return callback(new errors.BrokerNotAvailableError('Controller broker not available'));
}

this.updateMetadatas(result);

var controllerId = result[1].clusterMetadata.controllerId;
var controllerMetadata = result[0][controllerId];

var broker = this.getBroker(controllerMetadata.host, controllerMetadata.port);

if (!broker || !broker.isConnected()) {
return callback(new errors.BrokerNotAvailableError('Controller broker not available'));
}

return callback(null, broker);
});
};

KafkaClient.prototype.getBroker = function (host, port, longpolling) {

This comment has been minimized.

Copy link
@aikar

aikar Jul 25, 2018

Contributor

@jlandersen I'm concerned about this method.

getBrokers takes a param for longpolling which isn't passed here.

So you could pass longpolling=true, and end up returning a non long polling broker.

Doesn't appear the current code does this, as longpolling is never set, but I feel like this is a time bomb waiting to happen?

This comment has been minimized.

Copy link
@jlandersen

jlandersen Jul 25, 2018

Author Contributor

Makes sense - let me add a PR as soon as possible to pass along the parameter to getBrokers.

const brokers = this.getBrokers();

var addr = host + ':' + port;
return brokers[addr] || this.setupBroker(host, port, longpolling, brokers);
};

KafkaClient.prototype.setupBroker = function (host, port, longpolling, brokers) {
var brokerKey = host + ':' + port;
brokers[brokerKey] = this.createBroker(host, port, longpolling);
Expand Down Expand Up @@ -346,6 +390,22 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) {
}
};

KafkaClient.prototype.setClusterMetadata = function (clusterMetadata) {
assert(clusterMetadata, 'clusterMetadata is empty');
this.clusterMetadata = clusterMetadata;
};

KafkaClient.prototype.setControllerId = function (controllerId) {
if (!this.clusterMetadata) {
this.clusterMetadata = {
controllerId
};

return;
}
this.clusterMetadata.controllerId = controllerId;
};

KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadata) {
assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect');
logger.debug('updating metadatas');
Expand All @@ -355,6 +415,10 @@ KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadat
} else {
_.extend(this.topicMetadata, metadatas[1].metadata);
}

if (metadatas[1].clusterMetadata) {
this.setClusterMetadata(metadatas[1].clusterMetadata);
}
};

KafkaClient.prototype.brokerForLeader = function (leader, longpolling) {
Expand Down Expand Up @@ -693,6 +757,74 @@ KafkaClient.prototype.clearCallbackQueue = function (socket, error) {
delete this.cbqueue[socketId];
};

/**
* Fetches metadata for brokers and cluster.
* This includes an array containing each node (id, host and port).
* Depending on kafka version, additional cluster information is available (controller id).
* @param {loadMetadataCallback} cb Function to call once metadata is loaded.
*/
KafkaClient.prototype.loadMetadata = function (callback) {
this.loadMetadataForTopics(null, callback);
};

/**
* Fetches metadata for brokers and cluster.
* This includes an array containing each node (id, host and port). As well as an object
* containing the topic name, partition, leader number, replica count, and in sync replicas per partition.
* Depending on kafka version, additional cluster information is available (controller id).
* @param {Array} topics List of topics to fetch metadata for. An empty array ([]) will fetch all topics.
* @param {loadMetadataCallback} callback Function to call once metadata is loaded.
*/
KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) {
var broker = this.brokerForLeader();

if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return callback(new errors.BrokerNotAvailableError('Broker not available'));
}

const ensureBrokerReady = (broker, cb) => {
if (!broker.isReady()) {
logger.debug('missing apiSupport waiting until broker is ready...');
this.waitUntilReady(broker, cb);
} else {
cb(null);
}
};

async.series([
cb => {
ensureBrokerReady(broker, cb);
},
cb => {
var correlationId = this.nextId();
var supportedCoders = getSupportedForRequestType(broker, 'metadata');
var request = supportedCoders.encoder(this.clientId, correlationId, topics);

this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
broker.write(request);
}
], (err, result) => {
callback(err, result[1]);
});
};

/**
* Creates one or more topics.
* @param {Array} topics Array of topics with partition and replication factor to create.
* @param {createTopicsCallback} callback Function to call once operation is completed.
*/
KafkaClient.prototype.createTopics = function (topics, callback) {
// Calls with [string, string, ...] are forwarded to support previous versions
if (topics.every(t => typeof t === 'string')) {
return Client.prototype.createTopics.apply(this, arguments);
}

const encoder = protocol.encodeCreateTopicRequest;
const decoder = protocol.decodeCreateTopicResponse;

this.sendControllerRequest(encoder, decoder, [topics, this.options.requestTimeout], callback);
};

KafkaClient.prototype.topicExists = function (topics, callback) {
this.loadMetadataForTopics([], (error, response) => {
if (error) {
Expand Down Expand Up @@ -870,6 +1002,51 @@ KafkaClient.prototype.verifyPayloadsHasLeaders = function (payloads, callback) {
});
};

KafkaClient.prototype.wrapControllerCheckIfNeeded = function (encoder, decoder, encoderArgs, callback) {
if (callback.isControllerWrapper) {
return callback;
}

var hasBeenInvoked = false;

const wrappedCallback = (error, result) => {
if (error instanceof NotControllerError) {
this.setControllerId(null);

if (!hasBeenInvoked) {
hasBeenInvoked = true;
this.sendControllerRequest(encoder, decoder, encoderArgs, wrappedCallback);
return;
}
}

callback(error, result);
};

wrappedCallback.isControllerWrapper = true;

return wrappedCallback;
};

KafkaClient.prototype.sendControllerRequest = function (encoder, decoder, encoderArgs, callback) {
this.getController((error, controller) => {
if (error) {
return callback(error);
}

const originalArgs = _.clone(encoderArgs);
const originalCallback = callback;
const correlationId = this.nextId();
encoderArgs.unshift(this.clientId, correlationId);
const request = encoder.apply(null, encoderArgs);

callback = this.wrapControllerCheckIfNeeded(encoder, decoder, originalArgs, originalCallback);

this.queueCallback(controller.socket, correlationId, [decoder, callback]);
controller.write(request);
});
};

KafkaClient.prototype.sendFetchRequest = function (
consumer,
payloads,
Expand Down
Loading

0 comments on commit a3653ba

Please sign in to comment.