Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect directly to kafka brokers using new client closes #666 #691

Merged
merged 28 commits into from
Jun 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3633607
Add initial kafka client and test
hyperlink Apr 18, 2017
2ab8fbf
Expose KafkaClient
hyperlink Apr 18, 2017
cee3b41
Add doc for new KafkaClient
hyperlink Apr 18, 2017
d37ba7b
Add TimeoutError to broker connect
hyperlink Apr 19, 2017
bbd5278
Add more test to verify timeout
hyperlink Apr 19, 2017
0e231de
Add support to timeout kafka request
hyperlink Apr 20, 2017
0bf7198
Attempt to connect to a list of initial kafka hosts
hyperlink Apr 22, 2017
70416ed
Connect to set of brokers with retries
hyperlink Apr 22, 2017
df6972d
consumerGroup can use kafkaClient
hyperlink Apr 23, 2017
f23fc14
ConsumerGroup should throw if migration is turned on and KafkaClient …
hyperlink May 19, 2017
b6b5a49
double check brokerMetadata after connect
hyperlink May 19, 2017
dc6edb6
add setter for brokerMetadata
hyperlink May 19, 2017
ee1fd9a
Clarify when ready is emitted in KafkaClient
hyperlink May 19, 2017
976020f
Add broker wrapper helper
hyperlink May 19, 2017
74f4248
add getAvailableBroker method and updateMetadata should update broker…
hyperlink May 19, 2017
9d56854
formatting
hyperlink May 19, 2017
2e9252b
Added method to refresh the broker metadata
hyperlink May 22, 2017
3555a8f
Add to ConsumerGroup using Kafka Client to rebalance tests
hyperlink May 25, 2017
32f4a16
remove console.log
hyperlink May 25, 2017
81d1ff2
Call close on test Kafka Clients
hyperlink May 25, 2017
83eeddb
formatting
hyperlink May 25, 2017
9c45d3a
add hooks for refreshMetadata for KafkaClient
hyperlink May 25, 2017
e832d6d
Check instance of Client for HighLevelConsumer enforcing only origina…
hyperlink May 30, 2017
c952bb9
Allow closing the client to abort connection retries
hyperlink May 30, 2017
eacefdc
logging metadata is too spammy.
hyperlink May 30, 2017
0f52b0f
Add KafkaClient test to highLevelProducer tests
hyperlink May 30, 2017
60773e3
Update doc with more details about KafkaClient options.
hyperlink May 30, 2017
b82ada7
Override topicExists method in Kafka Client since the previous implem…
hyperlink Jun 19, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1
- [Features](#features)
- [Install Kafka](#install-kafka)
- [API](#api)
- [KafkaClient](#kafkaclient)
- [Client](#client)
- [Producer](#producer)
- [HighLevelProducer](#highlevelproducer)
Expand Down Expand Up @@ -47,11 +48,37 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1
* Manage topic Offsets
* SSL connections to brokers (Kafka 0.9+)
* Consumer Groups managed by Kafka coordinator (Kafka 0.9+)
* Connect directly to brokers (Kafka 0.9+)

# Install Kafka
Follow the [instructions](http://kafka.apache.org/documentation.html#quickstart) on the Kafka wiki to build Kafka 0.8 and get a test broker up and running.

# API

## KafkaClient

New KafkaClient connects directly to Kafka brokers instead of connecting to zookeeper for broker discovery.

### New Features

* Kafka **ONLY** no zookeeper
* Added request timeout
* Added connection timeout and retry

### Notable differences

* Constructor accepts an single options object (see below)
* Unlike the original `Client` `KafkaClient` will not emit socket errors it will do it's best to recover and only emit errors when it has exhausted it's recovery attempts
* `ready` event is only emitted after successful connection to a broker and metadata request to that broker
* `Client` uses zookeeper to discover the SSL kafka host/port since we connect directly to the broker this host/port for SSL need to be correct

### Options
* `kafkaHost` : A string of kafka broker/host combination delimited by comma for example: `kafka-1.us-east-1.myapp.com:9093,kafka-2.us-east-1.myapp.com:9093,kafka-3.us-east-1.myapp.com:9093` default: `localhost:9092`.
* `connectTimeout` : in ms it takes to wait for a successful connection before moving to the next host default: `10000`
* `requestTimeout` : in ms for a kafka request to timeout default: `30000`
* `autoConnect` : automatically connect when KafkaClient is instantiated otherwise you need to manually call `connect` default: `true`
* `connectRetryOptions` : object hash that applies to the initial connection. see [retry](https://www.npmjs.com/package/retry) module for these options.

## Client
### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions], [sslOptions])
* `connectionString`: Zookeeper connection string, default `localhost:2181/`
Expand Down Expand Up @@ -565,7 +592,8 @@ API is very similar to `HighLevelConsumer` since it extends directly from HLC so

```js
var options = {
host: 'zookeeper:2181',
host: 'zookeeper:2181', // zookeeper host omit if connecting directly to broker (see kafkaHost below)
kafkaHost: 'broker:9092', // connect directly to kafka broker (instantiates a KafkaClient)
zk : undefined, // put client zk settings if you need them (see Client)
batch: undefined, // put client batch settings if you need them (see Client)
ssl: true, // optional (defaults to false) or tls options hash
Expand Down
1 change: 1 addition & 0 deletions kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ exports.ConsumerGroup = require('./lib/consumerGroup');
exports.Consumer = require('./lib/consumer');
exports.Producer = require('./lib/producer');
exports.Client = require('./lib/client');
exports.KafkaClient = require('./lib/kafkaClient');
exports.Offset = require('./lib/offset');
exports.KeyedMessage = require('./lib/protocol').KeyedMessage;
exports.DefaultPartitioner = require('./lib/partitioner').DefaultPartitioner;
Expand Down
94 changes: 56 additions & 38 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,19 @@ var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions,
util.inherits(Client, events.EventEmitter);

Client.prototype.connect = function () {
var zk = this.zk = new Zookeeper(this.connectionString, this.zkOptions);
var zk = (this.zk = new Zookeeper(this.connectionString, this.zkOptions));
var self = this;
zk.once('init', function (brokers) {
try {
self.ready = true;
self.brokerMetadata = brokers;
self.setupBrokerProfiles(brokers);
Object
.keys(self.brokerProfiles)
.some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
});
Object.keys(self.brokerProfiles).some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
});
self.emit('ready');
} catch (error) {
self.ready = false;
Expand Down Expand Up @@ -238,7 +236,7 @@ Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeou
var innerSet = encodeMessageSet(payload.messages);
codec.encode(innerSet, function (err, message) {
if (err) return cb(err);
payload.messages = [ new Message(0, attributes, '', message) ];
payload.messages = [new Message(0, attributes, '', message)];
cb();
});
}
Expand Down Expand Up @@ -274,6 +272,8 @@ Client.prototype.sendOffsetRequest = function (payloads, cb) {
this.send(payloads, encoder, decoder, cb);
};

Client.prototype.refreshBrokerMetadata = function () {};

Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) {
requestArgs = _.values(requestArgs);
var cb = requestArgs.pop();
Expand All @@ -285,6 +285,7 @@ Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) {
var broker = this.brokerForLeader(this.coordinatorId);

if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
this.refreshBrokerMetadata();
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}

Expand Down Expand Up @@ -464,11 +465,13 @@ Client.prototype.refreshBrokers = function () {
function closeDeadBrokers (brokers) {
var deadBrokerKeys = _.difference(Object.keys(brokers), validBrokers);
if (deadBrokerKeys.length) {
self.closeBrokers(deadBrokerKeys.map(function (key) {
var broker = brokers[key];
delete brokers[key];
return broker;
}));
self.closeBrokers(
deadBrokerKeys.map(function (key) {
var broker = brokers[key];
delete brokers[key];
return broker;
})
);
}
}

Expand Down Expand Up @@ -511,7 +514,9 @@ Client.prototype.send = function (payloads, encoder, decoder, cb) {
return;
}
if (payloads[1].length) {
var topicNames = payloads[1].map(function (p) { return p.topic; });
var topicNames = payloads[1].map(function (p) {
return p.topic;
});
this.loadMetadataForTopics(topicNames, function (err, resp) {
if (err) {
return cb(err);
Expand All @@ -526,6 +531,7 @@ Client.prototype.send = function (payloads, encoder, decoder, cb) {
// check payloads again
payloads = self.checkMetadatas(_payloads);
if (payloads[1].length) {
this.refreshBrokerMetadata();
return cb(new errors.BrokerNotAvailableError('Could not find the leader'));
}

Expand All @@ -548,6 +554,7 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
var request = encoder(this.clientId, correlationId, payloads[leader]);
var broker = this.brokerForLeader(leader, longpolling);
if (!broker || !broker.socket || broker.socket.error || broker.socket.closing || broker.socket.destroyed) {
this.refreshBrokerMetadata();
return cb(new errors.BrokerNotAvailableError('Could not find the leader'), payloads[leader]);
}

Expand All @@ -567,21 +574,23 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
};

Client.prototype.checkMetadatas = function (payloads) {
if (_.isEmpty(this.topicMetadata)) return [ [], payloads ];
if (_.isEmpty(this.topicMetadata)) return [[], payloads];
// out: [ [metadata exists], [metadata not exists] ]
var out = [ [], [] ];
payloads.forEach(function (p) {
if (this.hasMetadata(p.topic, p.partition)) out[0].push(p);
else out[1].push(p);
}.bind(this));
var out = [[], []];
payloads.forEach(
function (p) {
if (this.hasMetadata(p.topic, p.partition)) out[0].push(p);
else out[1].push(p);
}.bind(this)
);
return out;
};

Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
var leader = this.leaderByPartition(topic, partition);

return (leader !== undefined) && brokerMetadata[leader];
return leader !== undefined && brokerMetadata[leader];
};

Client.prototype.updateMetadatas = function (metadatas) {
Expand All @@ -598,19 +607,24 @@ Client.prototype.updateMetadatas = function (metadatas) {
};

Client.prototype.removeTopicMetadata = function (topics, cb) {
topics.forEach(function (t) {
if (this.topicMetadata[t]) delete this.topicMetadata[t];
}.bind(this));
topics.forEach(
function (t) {
if (this.topicMetadata[t]) delete this.topicMetadata[t];
}.bind(this)
);
cb(null, topics.length);
};

Client.prototype.payloadsByLeader = function (payloads) {
return payloads.reduce(function (out, p) {
var leader = this.leaderByPartition(p.topic, p.partition);
out[leader] = out[leader] || [];
out[leader].push(p);
return out;
}.bind(this), {});
return payloads.reduce(
function (out, p) {
var leader = this.leaderByPartition(p.topic, p.partition);
out[leader] = out[leader] || [];
out[leader].push(p);
return out;
}.bind(this),
{}
);
};

Client.prototype.leaderByPartition = function (topic, partition) {
Expand All @@ -633,7 +647,7 @@ Client.prototype.brokerForLeader = function (leader, longpolling) {
}
}

var broker = _.find(this.brokerProfiles, {id: leader});
var broker = _.find(this.brokerProfiles, { id: leader });

if (!broker) {
return;
Expand Down Expand Up @@ -738,15 +752,19 @@ Client.prototype.handleReceivedData = function (socket) {
var decoder = handlers[0];
var cb = handlers[1];
var result = decoder(resp);
(result instanceof Error)
? cb.call(this, result)
: cb.call(this, null, result);
result instanceof Error ? cb.call(this, result) : cb.call(this, null, result);
buffer.consume(size);
if (socket.longpolling) socket.waiting = false;
} else { return; }
} else {
return;
}

if (socket.buffer.length) {
setImmediate(function () { this.handleReceivedData(socket); }.bind(this));
setImmediate(
function () {
this.handleReceivedData(socket);
}.bind(this)
);
}
};

Expand Down
Loading