From 36336073f716ff7163574044effcdea510babea1 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 18 Apr 2017 12:01:09 -0400 Subject: [PATCH 01/28] Add initial kafka client and test --- lib/kafkaClient.js | 188 +++++++++++++++++++++++++++++++++++++++ test/test.kafkaClient.js | 36 ++++++++ 2 files changed, 224 insertions(+) create mode 100644 lib/kafkaClient.js create mode 100644 test/test.kafkaClient.js diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js new file mode 100644 index 00000000..37c2a7fc --- /dev/null +++ b/lib/kafkaClient.js @@ -0,0 +1,188 @@ +'use strict'; + +const Client = require('./client'); +const logger = require('./logging')('kafka-node:KafkaClient'); +// const async = require('async'); +// const retry = require('retry'); +const _ = require('lodash'); +const util = require('util'); +const net = require('net'); +const BufferList = require('bl'); +const tls = require('tls'); +const BrokerWrapper = require('./wrapper/BrokerWrapper'); +const errors = require('./errors'); +const validateConfig = require('./utils').validateConfig; + +const DEFAULTS = { + kafkaHost: 'localhost:9092', + connectionTimeout: 10000, + ssl: false, + autoConnect: true +}; + +const KafkaClient = function (options) { + this.options = _.defaults((options || {}), DEFAULTS); + + this.sslOptions = this.options.sslOptions; + this.ssl = !!this.sslOptions; + + if (this.options.ssl === true) { + this.options.ssl = {}; + } + + if (this.options.clientId) { + validateConfig('clientId', this.options.clientId); + } + + this.clientId = this.options.clientId || 'kafka-node-client'; + this.noAckBatchOptions = this.noAckBatchOptions; + this.brokers = {}; + this.longpollingBrokers = {}; + this.topicMetadata = {}; + this.topicPartitions = {}; + this.correlationId = 0; + this._socketId = 0; + this.cbqueue = {}; + this.brokerMetadata = {}; + this.ready = false; + if (this.options.autoConnect) { + this.connect(); + } +}; + +util.inherits(KafkaClient, Client); + +/* +{ '1001': + { jmx_port: -1, + timestamp: '1492521177416', + endpoints: [ 'PLAINTEXT://127.0.0.1:9092', 'SSL://127.0.0.1:9093' ], + host: '127.0.0.1', + version: 2, + port: '9092', + id: '1001' } } + + vs + +{ '1001': { nodeId: 1001, host: '127.0.0.1', port: 9093 } } + + */ + +function hostParse (hostString) { + const piece = hostString.split(':'); + return { + host: piece[0], + port: piece[1] + }; +} + +KafkaClient.prototype.connect = function () { + if (this.connecting) { + logger.debug('connect request ignored. Client is currently connecting'); + return; + } + this.connecting = true; + const broker = hostParse(this.options.kafkaHost); + this.connectToBroker(broker, this.options.connectionTimeout, (error) => { + if (error) { + return this.emit('error', error); + } + this.emit('ready'); + }); +}; + +KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { + logger.debug(`Trying to connect to host: ${broker.host} port: ${broker.port}`); + let timer = null; + + this.once('connect', () => { + logger.debug('connected'); + this.loadMetadataForTopics([], (error, result) => { + if (error) { + return callback(error); + } + this.brokerMetadata = result[0]; + this.updateMetadatas(result); + clearTimeout(timer); + callback(); + }); + }); + const brokerWrapper = this.setupBroker(broker.host, broker.port, false, this.brokers); + const socket = brokerWrapper.socket; + + console.log('attach socket error handler'); + socket.on('error', (error) => { + logger.debug('Socket Error', error); + clearTimeout(timer); + this.closeBrokers(this.brokers); + callback(error); + }); + + timer = setTimeout(() => { + logger.debug('Connection timeout error'); + this.closeBrokers(this.brokers); + callback(new Error(`Connection timeout of ${timeout} exceeded`)); + }, timeout); +}; + +KafkaClient.prototype.createBroker = function (host, port, longpolling) { + var self = this; + var socket; + if (self.ssl) { + socket = tls.connect(port, host, self.sslOptions); + } else { + socket = net.createConnection(port, host); + } + socket.addr = host + ':' + port; + socket.host = host; + socket.port = port; + socket.socketId = this.nextSocketId(); + if (longpolling) socket.longpolling = true; + + socket.on('connect', function () { + var lastError = this.error; + this.error = null; + if (lastError) { + this.waiting = false; + self.emit('reconnect'); + } else { + self.emit('connect'); + } + }); + socket.on('error', function (err) { + this.error = err; + if (!self.connecting) { + self.emit('error', err); + } + }); + socket.on('close', function (hadError) { + self.emit('close', this); + if (hadError && this.error) { + self.clearCallbackQueue(this, this.error); + } else { + self.clearCallbackQueue(this, new errors.BrokerNotAvailableError('Broker not available')); + } + retry(this); + }); + socket.on('end', function () { + retry(this); + }); + socket.buffer = new BufferList(); + socket.on('data', function (data) { + socket.buffer.append(data); + self.handleReceivedData(socket); + }); + socket.setKeepAlive(true, 60000); + + function retry (s) { + if (s.retrying || s.closing) return; + s.retrying = true; + s.retryTimer = setTimeout(function () { + if (s.closing) return; + self.reconnectBroker(s); + }, 1000); + } + return new BrokerWrapper(socket, this.noAckBatchOptions); +}; + +module.exports = KafkaClient; diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js new file mode 100644 index 00000000..4faf1225 --- /dev/null +++ b/test/test.kafkaClient.js @@ -0,0 +1,36 @@ +'use strict'; + +const kafka = require('..'); +const Client = kafka.KafkaClient; + + +describe('Kafka Client', function () { + it('should connect plaintext', function (done) { + const client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + it('should error when connecting to an invalid host', function (done) { + const client = new Client({ + kafkaHost: 'localhost:9094' + }); + + client.on('error', function (error) { + console.log('test error', error) + error.code.should.be.eql('ECONNREFUSED'); + done(); + }); + }); + + it('should connect SSL', function (done) { + const client = new Client({ + kafkaHost: 'localhost:9093', + sslOptions: { + rejectUnauthorized: false + } + }); + client.once('ready', done); + }); +}); From 2ab8fbf33a7f70b19f507d4867ec2d2e49a6cbd3 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 18 Apr 2017 12:01:44 -0400 Subject: [PATCH 02/28] Expose KafkaClient --- kafka.js | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka.js b/kafka.js index dc638bc0..de761dc5 100644 --- a/kafka.js +++ b/kafka.js @@ -4,6 +4,7 @@ exports.ConsumerGroup = require('./lib/consumerGroup'); exports.Consumer = require('./lib/consumer'); exports.Producer = require('./lib/producer'); exports.Client = require('./lib/client'); +exports.KafkaClient = require('./lib/kafkaClient'); exports.Offset = require('./lib/offset'); exports.KeyedMessage = require('./lib/protocol').KeyedMessage; exports.DefaultPartitioner = require('./lib/partitioner').DefaultPartitioner; From cee3b41367d88bbbe20249f4d773e877f1beaef3 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 18 Apr 2017 12:20:39 -0400 Subject: [PATCH 03/28] Add doc for new KafkaClient --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 40979dca..40032191 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,16 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1 Follow the [instructions](http://kafka.apache.org/documentation.html#quickstart) on the Kafka wiki to build Kafka 0.8 and get a test broker up and running. # API + +## KafkaClient + +New KafkaClient connects directly to Kafka brokers instead of connecting to zookeeper for broker discovery. + +### Notable differences + +* Constructor accepts an options object +* Unlike the original `Client` `KafkaClient` will not emit socket errors it will do it's best to recover and only emit errors when it has exhausted it's recovery attempts + ## Client ### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions], [sslOptions]) * `connectionString`: Zookeeper connection string, default `localhost:2181/` From d37ba7b5f5217cdf97329d755530f5fad6168a54 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Wed, 19 Apr 2017 11:40:49 -0400 Subject: [PATCH 04/28] Add TimeoutError to broker connect --- lib/errors/TimeoutError.js | 13 +++++++++++++ lib/kafkaClient.js | 19 ++++++++++--------- 2 files changed, 23 insertions(+), 9 deletions(-) create mode 100644 lib/errors/TimeoutError.js diff --git a/lib/errors/TimeoutError.js b/lib/errors/TimeoutError.js new file mode 100644 index 00000000..c40a359c --- /dev/null +++ b/lib/errors/TimeoutError.js @@ -0,0 +1,13 @@ +'use strict'; + +var util = require('util'); + +var TimeoutError = function (message) { + Error.captureStackTrace(this, this); + this.message = message; +}; + +util.inherits(TimeoutError, Error); +TimeoutError.prototype.name = 'TimeoutError'; + +module.exports = TimeoutError; diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 37c2a7fc..effbeff5 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -12,10 +12,11 @@ const tls = require('tls'); const BrokerWrapper = require('./wrapper/BrokerWrapper'); const errors = require('./errors'); const validateConfig = require('./utils').validateConfig; +const TimeoutError = require('./errors/TimeoutError'); const DEFAULTS = { kafkaHost: 'localhost:9092', - connectionTimeout: 10000, + connectTimeout: 10000, ssl: false, autoConnect: true }; @@ -83,7 +84,7 @@ KafkaClient.prototype.connect = function () { } this.connecting = true; const broker = hostParse(this.options.kafkaHost); - this.connectToBroker(broker, this.options.connectionTimeout, (error) => { + this.connectToBroker(broker, this.options.connectTimeout, (error) => { if (error) { return this.emit('error', error); } @@ -93,17 +94,17 @@ KafkaClient.prototype.connect = function () { KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { logger.debug(`Trying to connect to host: ${broker.host} port: ${broker.port}`); - let timer = null; + let connectTimer = null; this.once('connect', () => { - logger.debug('connected'); + logger.debug('broker socket connected'); this.loadMetadataForTopics([], (error, result) => { if (error) { return callback(error); } this.brokerMetadata = result[0]; this.updateMetadatas(result); - clearTimeout(timer); + clearTimeout(connectTimer); callback(); }); }); @@ -113,15 +114,15 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { console.log('attach socket error handler'); socket.on('error', (error) => { logger.debug('Socket Error', error); - clearTimeout(timer); + clearTimeout(connectTimer); this.closeBrokers(this.brokers); callback(error); }); - timer = setTimeout(() => { + connectTimer = setTimeout(() => { logger.debug('Connection timeout error'); this.closeBrokers(this.brokers); - callback(new Error(`Connection timeout of ${timeout} exceeded`)); + callback(new TimeoutError(`Connection timeout of ${timeout} exceeded`)); }, timeout); }; @@ -152,7 +153,7 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) { socket.on('error', function (err) { this.error = err; if (!self.connecting) { - self.emit('error', err); + self.emit('socket_error', err); } }); socket.on('close', function (hadError) { From bbd5278a9d50570e714474a7a7b6f77169825103 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Wed, 19 Apr 2017 11:59:37 -0400 Subject: [PATCH 05/28] Add more test to verify timeout --- lib/kafkaClient.js | 16 ++++++++++------ test/test.kafkaClient.js | 40 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index effbeff5..11559e39 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -96,6 +96,13 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { logger.debug(`Trying to connect to host: ${broker.host} port: ${broker.port}`); let connectTimer = null; + const onError = (error) => { + clearTimeout(connectTimer); + connectTimer = null; + this.closeBrokers(this.brokers); + callback(error); + }; + this.once('connect', () => { logger.debug('broker socket connected'); this.loadMetadataForTopics([], (error, result) => { @@ -108,21 +115,18 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { callback(); }); }); + const brokerWrapper = this.setupBroker(broker.host, broker.port, false, this.brokers); const socket = brokerWrapper.socket; - console.log('attach socket error handler'); socket.on('error', (error) => { logger.debug('Socket Error', error); - clearTimeout(connectTimer); - this.closeBrokers(this.brokers); - callback(error); + onError(error); }); connectTimer = setTimeout(() => { logger.debug('Connection timeout error'); - this.closeBrokers(this.brokers); - callback(new TimeoutError(`Connection timeout of ${timeout} exceeded`)); + onError(new TimeoutError(`Connection timeout of ${timeout} exceeded`)); }, timeout); }; diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 4faf1225..24c1c275 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -2,7 +2,9 @@ const kafka = require('..'); const Client = kafka.KafkaClient; - +const sinon = require('sinon'); +const EventEmitter = require('events'); +const TimeoutError = require('../lib/errors/TimeoutError'); describe('Kafka Client', function () { it('should connect plaintext', function (done) { @@ -18,7 +20,6 @@ describe('Kafka Client', function () { }); client.on('error', function (error) { - console.log('test error', error) error.code.should.be.eql('ECONNREFUSED'); done(); }); @@ -33,4 +34,39 @@ describe('Kafka Client', function () { }); client.once('ready', done); }); + + describe('Verify Timeout', function () { + let sandbox; + + beforeEach(function () { + sandbox = sinon.sandbox.create(); + }); + + afterEach(function () { + sandbox.restore(); + }); + + it('should timeout when connect is not emitted', function (done) { + const clock = sandbox.useFakeTimers(); + const client = new Client({ + autoConnect: false, + kafkaHost: 'localhost:9093', + sslOptions: { + rejectUnauthorized: false + } + }); + + sandbox.stub(client, 'setupBroker').returns({ + socket: new EventEmitter() + }); + + client.connect(); + client.once('error', function (error) { + error.should.be.an.instanceOf(TimeoutError); + done(); + }); + + clock.tick(10000); + }); + }); }); From 0e231dee2baebe3aeee0a28aa236c01da57b163f Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Thu, 20 Apr 2017 11:01:31 -0400 Subject: [PATCH 06/28] Add support to timeout kafka request --- README.md | 6 +++++ lib/kafkaClient.js | 32 ++++++++++++++++++++++++++ test/test.kafkaClient.js | 49 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+) diff --git a/README.md b/README.md index 40032191..9f51080c 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,12 @@ Follow the [instructions](http://kafka.apache.org/documentation.html#quickstart) New KafkaClient connects directly to Kafka brokers instead of connecting to zookeeper for broker discovery. +### New Features + +* Kafka **ONLY** no zookeeper +* Added request timeout +* Added connection timeout + ### Notable differences * Constructor accepts an options object diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 11559e39..75adbd67 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -17,6 +17,7 @@ const TimeoutError = require('./errors/TimeoutError'); const DEFAULTS = { kafkaHost: 'localhost:9092', connectTimeout: 10000, + requestTimeout: 30000, ssl: false, autoConnect: true }; @@ -130,6 +131,37 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { }, timeout); }; +KafkaClient.prototype.wrapTimeoutIfNeeded = function (socketId, correlationId, callback) { + if (this.options.requestTimeout === false) { + return callback; + } + + let timeoutId = null; + + const wrappedFn = function () { + clear(); + callback.apply(null, arguments); + }; + + function clear () { + clearTimeout(timeoutId); + timeoutId = null; + } + + timeoutId = setTimeout(() => { + this.unqueueCallback(socketId, correlationId); + callback(new Error(`Request timed out after ${this.options.requestTimeout}ms`)); + callback = _.noop; + }, this.options.requestTimeout); + + return wrappedFn; +}; + +KafkaClient.prototype.queueCallback = function (socket, id, data) { + data[1] = this.wrapTimeoutIfNeeded(socket.socketId, id, data[1]); + Client.prototype.queueCallback.call(this, socket, id, data); +}; + KafkaClient.prototype.createBroker = function (host, port, longpolling) { var self = this; var socket; diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 24c1c275..4f5a3abd 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -7,6 +7,55 @@ const EventEmitter = require('events'); const TimeoutError = require('../lib/errors/TimeoutError'); describe('Kafka Client', function () { + describe('#wrapTimeoutIfNeeded', function () { + let sandbox, wrapTimeoutIfNeeded, client, clock; + + beforeEach(function () { + sandbox = sinon.sandbox.create(); + clock = sandbox.useFakeTimers(); + client = { + unqueueCallback: sandbox.stub(), + options: { + requestTimeout: false + } + }; + wrapTimeoutIfNeeded = Client.prototype.wrapTimeoutIfNeeded.bind(client); + }); + + afterEach(function () { + sandbox.restore(); + }); + + it('should not wrap if there is not a this.options.requestTimeout', function () { + const myFn = function () {}; + const retFn = wrapTimeoutIfNeeded(1, 1, myFn); + myFn.should.be.exactly(retFn); + }); + + it('should not yield timeout if returned callback is called in time', function (done) { + client.options.requestTimeout = 400; + const retFn = wrapTimeoutIfNeeded(1, 1, done); + retFn.should.not.be.exactly(done); + clock.tick(300); + retFn(); + clock.tick(300); + }); + + it('should yield timeout error if not called by timeout', function (done) { + client.options.requestTimeout = 400; + function callback (error) { + error.should.be.an.instanceOf(Error); + error.message.should.be.exactly('Request timed out after 400ms'); + sinon.assert.calledWithExactly(client.unqueueCallback, 1, 10); + done(); + } + const retFn = wrapTimeoutIfNeeded(1, 10, callback); + retFn.should.not.be.exactly(callback); + clock.tick(400); + retFn(new Error('BAD')); + }); + }); + it('should connect plaintext', function (done) { const client = new Client({ kafkaHost: 'localhost:9092' From 0bf71985f7b7e28bdb8337d3f992fe57a13ed1a3 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 21 Apr 2017 20:20:44 -0400 Subject: [PATCH 07/28] Attempt to connect to a list of initial kafka hosts --- lib/kafkaClient.js | 54 +++++++++++++++++++++++---- test/test.kafkaClient.js | 80 ++++++++++++++++++++++++++++++---------- 2 files changed, 106 insertions(+), 28 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 75adbd67..f282b614 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -2,8 +2,9 @@ const Client = require('./client'); const logger = require('./logging')('kafka-node:KafkaClient'); -// const async = require('async'); +const async = require('async'); // const retry = require('retry'); +const assert = require('assert'); const _ = require('lodash'); const util = require('util'); const net = require('net'); @@ -47,6 +48,9 @@ const KafkaClient = function (options) { this.cbqueue = {}; this.brokerMetadata = {}; this.ready = false; + + this.initialHosts = parseHostList(this.options.kafkaHost); + if (this.options.autoConnect) { this.connect(); } @@ -70,7 +74,7 @@ util.inherits(KafkaClient, Client); */ -function hostParse (hostString) { +function parseHost (hostString) { const piece = hostString.split(':'); return { host: piece[0], @@ -78,14 +82,19 @@ function hostParse (hostString) { }; } +function parseHostList (hosts) { + return hosts.split(',').map(parseHost); +} + KafkaClient.prototype.connect = function () { if (this.connecting) { logger.debug('connect request ignored. Client is currently connecting'); return; } this.connecting = true; - const broker = hostParse(this.options.kafkaHost); - this.connectToBroker(broker, this.options.connectTimeout, (error) => { + + this.connectToBrokers(this.initialHosts, error => { + this.connecting = false; if (error) { return this.emit('error', error); } @@ -93,6 +102,35 @@ KafkaClient.prototype.connect = function () { }); }; +KafkaClient.prototype.connectToBrokers = function (hosts, callback) { + assert(hosts && hosts.length, 'No hosts to connect to'); + let index = 0; + let ready = false; + let errors = []; + async.doWhilst(callback => { + this.connectToBroker(hosts[index++], this.options.connectTimeout, error => { + if (error) { + logger.debug('failed to connect because of ', error); + errors.push(error); + callback(null); + return; + } + errors.length = 0; + ready = true; + callback(null); + }); + }, + () => !ready && index < hosts.length, + () => { + if (ready) { + return callback(null); + } + if (errors.length) { + callback(errors.pop()); + } + }); +}; + KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { logger.debug(`Trying to connect to host: ${broker.host} port: ${broker.port}`); let connectTimer = null; @@ -104,7 +142,10 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { callback(error); }; - this.once('connect', () => { + const brokerWrapper = this.setupBroker(broker.host, broker.port, false, this.brokers); + const socket = brokerWrapper.socket; + + socket.once('connect', () => { logger.debug('broker socket connected'); this.loadMetadataForTopics([], (error, result) => { if (error) { @@ -117,9 +158,6 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { }); }); - const brokerWrapper = this.setupBroker(broker.host, broker.port, false, this.brokers); - const socket = brokerWrapper.socket; - socket.on('error', (error) => { logger.debug('Socket Error', error); onError(error); diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 4f5a3abd..535f3d7c 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -7,6 +7,43 @@ const EventEmitter = require('events'); const TimeoutError = require('../lib/errors/TimeoutError'); describe('Kafka Client', function () { + describe('#parseHostList', function () { + it('initial hosts should be parsed if single host is provided', function () { + const client = new Client({ + autoConnect: false, + kafkaHost: 'localhost:9092' + }); + + client.initialHosts.should.not.be.empty; + client.initialHosts.length.should.be.eql(1); + client.initialHosts[0].host.should.be.eql('localhost'); + client.initialHosts[0].port.should.be.eql('9092'); + }); + + it('initial hosts should be parsed if multiple hosts are provided', function () { + const client = new Client({ + autoConnect: false, + kafkaHost: 'localhost:9092,127.0.0.1:9093,192.168.1.0:9094' + }); + + client.initialHosts.should.not.be.empty; + client.initialHosts.should.be.eql([ + { + host: 'localhost', + port: '9092' + }, + { + host: '127.0.0.1', + port: '9093' + }, + { + host: '192.168.1.0', + port: '9094' + } + ]); + }); + }); + describe('#wrapTimeoutIfNeeded', function () { let sandbox, wrapTimeoutIfNeeded, client, clock; @@ -56,32 +93,35 @@ describe('Kafka Client', function () { }); }); - it('should connect plaintext', function (done) { - const client = new Client({ - kafkaHost: 'localhost:9092' + describe('#connect', function () { + it('should connect plaintext', function (done) { + const client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); }); - client.once('ready', done); - }); - it('should error when connecting to an invalid host', function (done) { - const client = new Client({ - kafkaHost: 'localhost:9094' - }); + it('should error when connecting to an invalid host', function (done) { + const client = new Client({ + retries: 3, + kafkaHost: 'localhost:9094' + }); - client.on('error', function (error) { - error.code.should.be.eql('ECONNREFUSED'); - done(); + client.on('error', function (error) { + error.code.should.be.eql('ECONNREFUSED'); + done(); + }); }); - }); - it('should connect SSL', function (done) { - const client = new Client({ - kafkaHost: 'localhost:9093', - sslOptions: { - rejectUnauthorized: false - } + it('should connect SSL', function (done) { + const client = new Client({ + kafkaHost: 'localhost:9093', + sslOptions: { + rejectUnauthorized: false + } + }); + client.once('ready', done); }); - client.once('ready', done); }); describe('Verify Timeout', function () { From 70416ed79a5637cf5696f9d3857f8fde26f982b0 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Sat, 22 Apr 2017 11:11:46 -0400 Subject: [PATCH 08/28] Connect to set of brokers with retries --- lib/kafkaClient.js | 35 +++++++++++++++++++++++++++-------- test/test.kafkaClient.js | 7 ++++++- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index f282b614..fa1ab6ce 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -3,7 +3,7 @@ const Client = require('./client'); const logger = require('./logging')('kafka-node:KafkaClient'); const async = require('async'); -// const retry = require('retry'); +const retry = require('retry'); const assert = require('assert'); const _ = require('lodash'); const util = require('util'); @@ -20,7 +20,14 @@ const DEFAULTS = { connectTimeout: 10000, requestTimeout: 30000, ssl: false, - autoConnect: true + autoConnect: true, + connectRetryOptions: { + retries: 5, + factor: 2, + minTimeout: 1 * 1000, + maxTimeout: 60 * 1000, + randomize: true + } }; const KafkaClient = function (options) { @@ -93,12 +100,24 @@ KafkaClient.prototype.connect = function () { } this.connecting = true; - this.connectToBrokers(this.initialHosts, error => { - this.connecting = false; - if (error) { - return this.emit('error', error); - } - this.emit('ready'); + const connect = retry.operation(this.options.connectRetryOptions); + + connect.attempt(currentAttempt => { + logger.debug(`Connect attempt ${currentAttempt}`); + this.connectToBrokers(this.initialHosts, error => { + if (connect.retry(error)) { + return; + } + + this.connecting = false; + + if (error) { + logger.debug('exhausted retries. Main error', connect.mainError()); + this.emit('error', connect.mainError()); + return; + } + this.emit('ready'); + }); }); }; diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 535f3d7c..89bd62fc 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -103,7 +103,9 @@ describe('Kafka Client', function () { it('should error when connecting to an invalid host', function (done) { const client = new Client({ - retries: 3, + connectRetryOptions: { + retries: 0 + }, kafkaHost: 'localhost:9094' }); @@ -138,6 +140,9 @@ describe('Kafka Client', function () { it('should timeout when connect is not emitted', function (done) { const clock = sandbox.useFakeTimers(); const client = new Client({ + connectRetryOptions: { + retries: 0 + }, autoConnect: false, kafkaHost: 'localhost:9093', sslOptions: { From df6972dd1924658a9c348571b4f74455bf5bcb9f Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Sat, 22 Apr 2017 23:30:46 -0400 Subject: [PATCH 09/28] consumerGroup can use kafkaClient --- lib/consumerGroup.js | 9 +++++++-- lib/kafkaClient.js | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/lib/consumerGroup.js b/lib/consumerGroup.js index 1e5f77fe..112ad79e 100644 --- a/lib/consumerGroup.js +++ b/lib/consumerGroup.js @@ -5,6 +5,7 @@ const util = require('util'); const EventEmitter = require('events'); const highLevelConsumer = require('./highLevelConsumer'); const Client = require('./client'); +const KafkaClient = require('./kafkaClient'); const Offset = require('./offset'); const _ = require('lodash'); const async = require('async'); @@ -70,8 +71,12 @@ function ConsumerGroup (memberOptions, topics) { throw new Error(`outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`); } - this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk, - memberOptions.batch, memberOptions.ssl); + if (memberOptions.kafkaHost) { + this.client = new KafkaClient(memberOptions); + } else { + this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk, + memberOptions.batch, memberOptions.ssl); + } if (_.isString(topics)) { topics = [topics]; diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index fa1ab6ce..84b49e11 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -188,6 +188,32 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { }, timeout); }; +KafkaClient.prototype.brokerForLeader = function (leader, longpolling) { + var addr; + var brokers = this.getBrokers(longpolling); + // If leader is not give, choose the first broker as leader + if (typeof leader === 'undefined') { + if (!_.isEmpty(brokers)) { + addr = Object.keys(brokers)[0]; + return brokers[addr]; + } else if (!_.isEmpty(this.brokerMetadata)) { + leader = Object.keys(this.brokerMetadata)[0]; + } else { + return; + } + } + + var broker = this.brokerMetadata[leader]; + + if (!broker) { + return; + } + + addr = broker.host + ':' + broker.port; + + return brokers[addr] || this.setupBroker(broker.host, broker.port, longpolling, brokers); +}; + KafkaClient.prototype.wrapTimeoutIfNeeded = function (socketId, correlationId, callback) { if (this.options.requestTimeout === false) { return callback; @@ -219,6 +245,14 @@ KafkaClient.prototype.queueCallback = function (socket, id, data) { Client.prototype.queueCallback.call(this, socket, id, data); }; +KafkaClient.prototype.close = function (callback) { + this.closeBrokers(this.brokers); + this.closeBrokers(this.longpollingBrokers); + setImmediate(function () { + callback(null); + }); +}; + KafkaClient.prototype.createBroker = function (host, port, longpolling) { var self = this; var socket; From f23fc14b395b8d5a49fcc3c479c0bb9b4d6cad42 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 13:40:12 -0400 Subject: [PATCH 10/28] ConsumerGroup should throw if migration is turned on and KafkaClient is used --- lib/consumerGroup.js | 4 ++++ test/test.consumerGroup.js | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/lib/consumerGroup.js b/lib/consumerGroup.js index 112ad79e..106e9a29 100644 --- a/lib/consumerGroup.js +++ b/lib/consumerGroup.js @@ -95,6 +95,10 @@ function ConsumerGroup (memberOptions, topics) { } if (this.options.migrateHLC) { + if (this.client instanceof KafkaClient) { + throw new Error('KafkaClient cannot be used to migrate from Zookeeper use Client instead'); + } + const ConsumerGroupMigrator = require('./consumerGroupMigrator'); this.migrator = new ConsumerGroupMigrator(this); this.migrator.on('error', function (error) { diff --git a/test/test.consumerGroup.js b/test/test.consumerGroup.js index 2ac654b3..0b2f6902 100644 --- a/test/test.consumerGroup.js +++ b/test/test.consumerGroup.js @@ -723,6 +723,18 @@ describe('ConsumerGroup', function () { }); }); + describe('KafkaClient support', function () { + it('should throw error if migration option is used with KafkaClient', function () { + should.throws(function () { + // eslint-disable-next-line no-new + new ConsumerGroup({ + kafkaHost: 'localhost:9092', + migrateHLC: true + }); + }); + }); + }); + describe('#scheduleReconnect', function () { var consumerGroup, sandbox; From b6b5a495ff88d9215634e81a640210391201e195 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 13:49:14 -0400 Subject: [PATCH 11/28] double check brokerMetadata after connect --- test/test.kafkaClient.js | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 89bd62fc..410b3e82 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -3,8 +3,8 @@ const kafka = require('..'); const Client = kafka.KafkaClient; const sinon = require('sinon'); -const EventEmitter = require('events'); const TimeoutError = require('../lib/errors/TimeoutError'); +const FakeSocket = require('./mocks/mockSocket'); describe('Kafka Client', function () { describe('#parseHostList', function () { @@ -98,7 +98,11 @@ describe('Kafka Client', function () { const client = new Client({ kafkaHost: 'localhost:9092' }); - client.once('ready', done); + client.once('error', done); + client.once('ready', function () { + client.brokerMetadata.should.not.be.empty; + done(); + }); }); it('should error when connecting to an invalid host', function (done) { @@ -122,7 +126,11 @@ describe('Kafka Client', function () { rejectUnauthorized: false } }); - client.once('ready', done); + client.once('error', done); + client.once('ready', function () { + client.brokerMetadata.should.not.be.empty; + done(); + }); }); }); @@ -151,7 +159,7 @@ describe('Kafka Client', function () { }); sandbox.stub(client, 'setupBroker').returns({ - socket: new EventEmitter() + socket: new FakeSocket() }); client.connect(); From dc6edb66e77cafc8f722e9de2bf6335025ba7586 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 15:15:59 -0400 Subject: [PATCH 12/28] add setter for brokerMetadata --- lib/kafkaClient.js | 11 ++++++ test/test.kafkaClient.js | 85 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 84b49e11..31ddd30e 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -188,6 +188,17 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { }, timeout); }; +KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { + assert(brokerMetadata, 'brokerMetadata is empty'); + const oldBrokerMetadata = this.brokerMetadata; + this.brokerMetadata = brokerMetadata; + this.brokerMetadataLastUpdate = Date.now(); + + if (!_.isEmpty(oldBrokerMetadata) && !_.isEqual(oldBrokerMetadata, brokerMetadata)) { + setImmediate(() => this.emit('brokersChanged')); + } +}; + KafkaClient.prototype.brokerForLeader = function (leader, longpolling) { var addr; var brokers = this.getBrokers(longpolling); diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 410b3e82..a0042bc1 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -5,6 +5,8 @@ const Client = kafka.KafkaClient; const sinon = require('sinon'); const TimeoutError = require('../lib/errors/TimeoutError'); const FakeSocket = require('./mocks/mockSocket'); +const should = require('should'); +const _ = require('lodash'); describe('Kafka Client', function () { describe('#parseHostList', function () { @@ -93,6 +95,89 @@ describe('Kafka Client', function () { }); }); + describe('#setBrokerMetadata', function () { + let clock; + + beforeEach(function () { + clock = sinon.useFakeTimers(); + }); + + afterEach(function () { + clock.restore(); + }); + + it('should set new brokerMetadata field on client no emit', function () { + const client = new Client({ + autoConnect: false, + kafkaHost: 'Kafka-1.us-east-1.myapp.com:9093' + }); + + const brokerMetadata = { + '1': { nodeId: 1, host: 'Kafka-1.us-east-1.myapp.com', port: 9093 }, + '2': { nodeId: 2, host: 'Kafka-2.us-east-1.myapp.com', port: 9093 }, + '3': { nodeId: 3, host: 'Kafka-3.us-east-1.myapp.com', port: 9093 } + }; + + client.on('brokersChanged', function () { + throw new Error('should not emit'); + }); + + client.setBrokerMetadata(brokerMetadata); + client.brokerMetadata.should.be.eql(brokerMetadata); + client.brokerMetadataLastUpdate.should.be.eql(0); + clock.tick(100); + }); + + it('should set same brokerMetadata field on client no emit', function () { + const client = new Client({ + autoConnect: false, + kafkaHost: 'Kafka-1.us-east-1.myapp.com:9093' + }); + + const brokerMetadata = { + '1': { nodeId: 1, host: 'Kafka-1.us-east-1.myapp.com', port: 9093 }, + '2': { nodeId: 2, host: 'Kafka-2.us-east-1.myapp.com', port: 9093 }, + '3': { nodeId: 3, host: 'Kafka-3.us-east-1.myapp.com', port: 9093 } + }; + + client.brokerMetadata = brokerMetadata; + should(client.brokerMetadataLastUpdate).be.empty; + + client.on('brokersChanged', function () { + throw new Error('should not emit'); + }); + + client.setBrokerMetadata(brokerMetadata); + client.brokerMetadata.should.be.eql(brokerMetadata); + client.brokerMetadataLastUpdate.should.be.eql(0); + clock.tick(100); + }); + + it('should set different brokerMetadata field on client no emit', function (done) { + const client = new Client({ + autoConnect: false, + kafkaHost: 'Kafka-1.us-east-1.myapp.com:9093' + }); + + const brokerMetadata = { + '1': { nodeId: 1, host: 'Kafka-1.us-east-1.myapp.com', port: 9093 }, + '2': { nodeId: 2, host: 'Kafka-2.us-east-1.myapp.com', port: 9093 }, + '3': { nodeId: 3, host: 'Kafka-3.us-east-1.myapp.com', port: 9093 } + }; + + client.brokerMetadata = _.clone(brokerMetadata); + should(client.brokerMetadataLastUpdate).be.empty; + + delete brokerMetadata['1']; + client.on('brokersChanged', done); + + client.setBrokerMetadata(brokerMetadata); + client.brokerMetadata.should.be.eql(brokerMetadata); + client.brokerMetadataLastUpdate.should.be.eql(0); + clock.tick(100); + }); + }); + describe('#connect', function () { it('should connect plaintext', function (done) { const client = new Client({ From ee1fd9ae010b5d5ac2de82b56d37a41e30c3f8e1 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 15:16:24 -0400 Subject: [PATCH 13/28] Clarify when ready is emitted in KafkaClient --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 9f51080c..cbfc8fd3 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ New KafkaClient connects directly to Kafka brokers instead of connecting to zook * Constructor accepts an options object * Unlike the original `Client` `KafkaClient` will not emit socket errors it will do it's best to recover and only emit errors when it has exhausted it's recovery attempts +* `ready` event is only emitted after successful connection to a broker and metadata request to that broker ## Client ### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions], [sslOptions]) From 976020f6874b12760c3de8fa8b1a0078bfb94377 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 15:16:41 -0400 Subject: [PATCH 14/28] Add broker wrapper helper --- lib/wrapper/BrokerWrapper.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/wrapper/BrokerWrapper.js b/lib/wrapper/BrokerWrapper.js index eacd09f0..04e86612 100644 --- a/lib/wrapper/BrokerWrapper.js +++ b/lib/wrapper/BrokerWrapper.js @@ -22,6 +22,10 @@ var BrokerWrapper = function (socket, noAckBatchOptions) { this.readableSocket = readable; }; +BrokerWrapper.prototype.isConnected = function () { + return !this.socket.destroyed && !this.socket.closing && !this.socket.error; +}; + BrokerWrapper.prototype.write = function (buffer) { this.socket.write(buffer); }; From 74f4248d4aba9a27851c8a2ae1fd4628ff60327d Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 15:30:09 -0400 Subject: [PATCH 15/28] add getAvailableBroker method and updateMetadata should update brokerMetadata as well --- lib/kafkaClient.js | 97 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 22 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 31ddd30e..1bce2899 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -116,18 +116,27 @@ KafkaClient.prototype.connect = function () { this.emit('error', connect.mainError()); return; } - this.emit('ready'); + + this.loadMetadataForTopics([], (error, result) => { + if (error) { + logger.debug('loadMetadataForTopics after connect failed', error); + return this.emit('error', error); + } + this.updateMetadatas(result); + this.emit('ready'); + }); }); }); }; KafkaClient.prototype.connectToBrokers = function (hosts, callback) { assert(hosts && hosts.length, 'No hosts to connect to'); + hosts = _.shuffle(hosts); let index = 0; - let ready = false; let errors = []; + let broker = null; async.doWhilst(callback => { - this.connectToBroker(hosts[index++], this.options.connectTimeout, error => { + this.connectToBroker(hosts[index++], (error, connectedBroker) => { if (error) { logger.debug('failed to connect because of ', error); errors.push(error); @@ -135,29 +144,33 @@ KafkaClient.prototype.connectToBrokers = function (hosts, callback) { return; } errors.length = 0; - ready = true; + broker = connectedBroker; callback(null); }); }, - () => !ready && index < hosts.length, + () => !broker && index < hosts.length, () => { - if (ready) { - return callback(null); + if (broker) { + return callback(null, broker); } + if (errors.length) { callback(errors.pop()); } }); }; -KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { +KafkaClient.prototype.connectToBroker = function (broker, callback) { + const timeout = this.options.connectTimeout; logger.debug(`Trying to connect to host: ${broker.host} port: ${broker.port}`); let connectTimer = null; const onError = (error) => { clearTimeout(connectTimer); connectTimer = null; - this.closeBrokers(this.brokers); + socket.closing = true; + socket.end(); + delete this.brokers[`${broker.host}:${broker.port}`]; callback(error); }; @@ -165,29 +178,63 @@ KafkaClient.prototype.connectToBroker = function (broker, timeout, callback) { const socket = brokerWrapper.socket; socket.once('connect', () => { - logger.debug('broker socket connected'); - this.loadMetadataForTopics([], (error, result) => { - if (error) { - return callback(error); - } - this.brokerMetadata = result[0]; - this.updateMetadatas(result); - clearTimeout(connectTimer); - callback(); - }); + logger.debug('broker socket connected %j', broker); + clearTimeout(connectTimer); + callback(null, brokerWrapper); }); - socket.on('error', (error) => { + socket.on('error', function (error) { logger.debug('Socket Error', error); onError(error); }); - connectTimer = setTimeout(() => { - logger.debug('Connection timeout error'); + connectTimer = setTimeout(function () { + logger.debug('Connection timeout error with broker %j', broker); onError(new TimeoutError(`Connection timeout of ${timeout} exceeded`)); }, timeout); }; +KafkaClient.prototype.setupBroker = function (host, port, longpolling, brokers) { + var brokerKey = host + ':' + port; + brokers[brokerKey] = this.createBroker(host, port, longpolling); + return brokers[brokerKey]; +}; + +// returns a connected broker +KafkaClient.prototype.getAvailableBroker = function (callback) { + const brokers = this.getBrokers(); + const connectedBrokers = _.filter(brokers, function (broker) { + return broker.isConnected(); + }); + + if (connectedBrokers.length) { + logger.debug('found %d connected broker(s)', connectedBrokers.length); + return callback(null, _.sample(connectedBrokers)); + } + + let brokersToTry; + + if (_.isEmpty(brokers)) { + brokersToTry = _.values(this.brokerMetadata); + } else { + const badBrokers = Object.keys(brokers); + brokersToTry = _.filter(this.brokerMetadata, function (broker) { + return !_.includes(badBrokers, `${broker.host}:${broker.port}`); + }); + } + + if (_.isEmpty(brokersToTry)) { + return callback(new Error('Unable to find available brokers to try')); + } + + this.connectToBrokers(brokersToTry, function (error, broker) { + if (error) { + return callback(error); + } + callback(null, broker); + }); +}; + KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { assert(brokerMetadata, 'brokerMetadata is empty'); const oldBrokerMetadata = this.brokerMetadata; @@ -199,6 +246,11 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { } }; +KafkaClient.prototype.updateMetadatas = function (metadatas) { + this.setBrokerMetadata(metadatas[0]); + Client.prototype.updateMetadatas.call(this, metadatas); +}; + KafkaClient.prototype.brokerForLeader = function (leader, longpolling) { var addr; var brokers = this.getBrokers(longpolling); @@ -318,6 +370,7 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) { s.retrying = true; s.retryTimer = setTimeout(function () { if (s.closing) return; + logger.debug('reconnecting broker'); self.reconnectBroker(s); }, 1000); } From 9d56854b0465c91d292b4770b8a7654bd61fba3c Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Fri, 19 May 2017 15:32:19 -0400 Subject: [PATCH 16/28] formatting --- lib/kafkaClient.js | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 1bce2899..7f59bfab 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -31,7 +31,7 @@ const DEFAULTS = { }; const KafkaClient = function (options) { - this.options = _.defaults((options || {}), DEFAULTS); + this.options = _.defaults(options || {}, DEFAULTS); this.sslOptions = this.options.sslOptions; this.ssl = !!this.sslOptions; @@ -135,29 +135,31 @@ KafkaClient.prototype.connectToBrokers = function (hosts, callback) { let index = 0; let errors = []; let broker = null; - async.doWhilst(callback => { - this.connectToBroker(hosts[index++], (error, connectedBroker) => { - if (error) { - logger.debug('failed to connect because of ', error); - errors.push(error); + async.doWhilst( + callback => { + this.connectToBroker(hosts[index++], (error, connectedBroker) => { + if (error) { + logger.debug('failed to connect because of ', error); + errors.push(error); + callback(null); + return; + } + errors.length = 0; + broker = connectedBroker; callback(null); - return; + }); + }, + () => !broker && index < hosts.length, + () => { + if (broker) { + return callback(null, broker); } - errors.length = 0; - broker = connectedBroker; - callback(null); - }); - }, - () => !broker && index < hosts.length, - () => { - if (broker) { - return callback(null, broker); - } - if (errors.length) { - callback(errors.pop()); + if (errors.length) { + callback(errors.pop()); + } } - }); + ); }; KafkaClient.prototype.connectToBroker = function (broker, callback) { @@ -165,7 +167,7 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) { logger.debug(`Trying to connect to host: ${broker.host} port: ${broker.port}`); let connectTimer = null; - const onError = (error) => { + const onError = error => { clearTimeout(connectTimer); connectTimer = null; socket.closing = true; From 2e9252bf934d4c40cc48cb6ed93faf5f28266718 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Mon, 22 May 2017 17:29:07 -0400 Subject: [PATCH 17/28] Added method to refresh the broker metadata --- lib/kafkaClient.js | 69 ++++++++++++++++++++++--- test/test.kafkaClient.js | 108 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 170 insertions(+), 7 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 7f59bfab..ce3514ba 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -14,12 +14,13 @@ const BrokerWrapper = require('./wrapper/BrokerWrapper'); const errors = require('./errors'); const validateConfig = require('./utils').validateConfig; const TimeoutError = require('./errors/TimeoutError'); +const protocol = require('./protocol'); +const NestedError = require('nested-error-stacks'); const DEFAULTS = { kafkaHost: 'localhost:9092', connectTimeout: 10000, requestTimeout: 30000, - ssl: false, autoConnect: true, connectRetryOptions: { retries: 5, @@ -122,6 +123,7 @@ KafkaClient.prototype.connect = function () { logger.debug('loadMetadataForTopics after connect failed', error); return this.emit('error', error); } + console.log(result); this.updateMetadatas(result); this.emit('ready'); }); @@ -229,12 +231,65 @@ KafkaClient.prototype.getAvailableBroker = function (callback) { return callback(new Error('Unable to find available brokers to try')); } - this.connectToBrokers(brokersToTry, function (error, broker) { - if (error) { - return callback(error); - } - callback(null, broker); + this.connectToBrokers(brokersToTry, callback); +}; + +KafkaClient.prototype.refreshBrokers = function () { + var self = this; + var validBrokers = _.map(this.brokerMetadata, function (broker) { + return `${broker.host}:${broker.port}`; }); + + function closeDeadBrokers (brokers) { + var deadBrokerKeys = _.difference(Object.keys(brokers), validBrokers); + if (deadBrokerKeys.length) { + self.closeBrokers( + deadBrokerKeys.map(function (key) { + var broker = brokers[key]; + delete brokers[key]; + return broker; + }) + ); + } + } + + closeDeadBrokers(this.brokers); + closeDeadBrokers(this.longpollingBrokers); +}; + +KafkaClient.prototype.refreshBrokerMetadata = function (callback) { + if (this.refreshingMetadata) { + return; + } + + if (callback == null) { + callback = _.noop; + } + + this.refreshingMetadata = true; + + async.waterfall( + [callback => this.getAvailableBroker(callback), (broker, callback) => this.loadMetadataFrom(broker, callback)], + (error, result) => { + this.refreshingMetadata = false; + if (error) { + callback(error); + return this.emit('error', new NestedError('refreshBrokerMetadata failed', error)); + } + this.updateMetadatas(result); + this.refreshBrokers(); + callback(error); + } + ); +}; + +Client.prototype.loadMetadataFrom = function (broker, cb) { + assert(broker && broker.isConnected()); + var correlationId = this.nextId(); + var request = protocol.encodeMetadataRequest(this.clientId, correlationId, []); + + this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataResponse, cb]); + broker.write(request); }; KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { @@ -249,6 +304,8 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { }; KafkaClient.prototype.updateMetadatas = function (metadatas) { + assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); + logger.debug('updating metadatas to', metadatas); this.setBrokerMetadata(metadatas[0]); Client.prototype.updateMetadatas.call(this, metadatas); }; diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index a0042bc1..dea6df27 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -4,6 +4,7 @@ const kafka = require('..'); const Client = kafka.KafkaClient; const sinon = require('sinon'); const TimeoutError = require('../lib/errors/TimeoutError'); +const BrokerWrapper = require('../lib/wrapper/BrokerWrapper'); const FakeSocket = require('./mocks/mockSocket'); const should = require('should'); const _ = require('lodash'); @@ -153,7 +154,7 @@ describe('Kafka Client', function () { clock.tick(100); }); - it('should set different brokerMetadata field on client no emit', function (done) { + it('should set different brokerMetadata field on client emit', function (done) { const client = new Client({ autoConnect: false, kafkaHost: 'Kafka-1.us-east-1.myapp.com:9093' @@ -219,6 +220,111 @@ describe('Kafka Client', function () { }); }); + describe('#refreshBrokerMetadata', function () { + let sandbox, client; + + beforeEach(function () { + sandbox = sinon.sandbox.create(); + client = new Client({ + connectRetryOptions: { + retries: 0 + }, + autoConnect: false, + kafkaHost: 'localhost:9093', + sslOptions: { + rejectUnauthorized: false + } + }); + }); + + afterEach(function () { + sandbox.restore(); + }); + + it('should refresh broker metadata using available broker', function (done) { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + const fakeDeadBroker = new BrokerWrapper(new FakeSocket()); + + const metadata = [ + { + '1': { nodeId: 1, host: 'Kafka-1.us-east-1.myapp.com', port: 9093 }, + '2': { nodeId: 2, host: 'Kafka-2.us-east-1.myapp.com', port: 9093 }, + '3': { nodeId: 3, host: 'Kafka-3.us-east-1.myapp.com', port: 9093 } + }, + { metadata: {} } + ]; + + client.brokerMetadata = _.clone(metadata[0]); + + const newBrokerKey = `${metadata[0][2].host}:${metadata[0][2].port}`; + const deadBrokerKey = `${metadata[0][1].host}:${metadata[0][1].port}`; + + client.brokers[deadBrokerKey] = fakeDeadBroker; + client.brokers[newBrokerKey] = fakeBroker; + + delete metadata[0]['1']; + + Object.assign(fakeBroker.socket, metadata['2']); + + sandbox.stub(client, 'getAvailableBroker').yields(null, fakeBroker); + sandbox.stub(client, 'loadMetadataFrom').yields(null, metadata); + sandbox.spy(client, 'updateMetadatas'); + sandbox.spy(client, 'refreshBrokers'); + + should(client.refreshingMetadata).be.empty; + + client.refreshBrokerMetadata(function (error) { + sinon.assert.calledOnce(client.getAvailableBroker); + sinon.assert.calledWith(client.loadMetadataFrom, fakeBroker, sinon.match.func); + sinon.assert.calledWith(client.updateMetadatas, metadata); + sinon.assert.calledOnce(client.refreshBrokers); + sinon.assert.callOrder( + client.getAvailableBroker, + client.loadMetadataFrom, + client.updateMetadatas, + client.refreshBrokers + ); + + client.brokers.should.have.property(newBrokerKey).and.be.exactly(fakeBroker); + client.brokers.should.not.have.property(deadBrokerKey); + + done(error); + }); + }); + + it('should emit an error', function (done) { + const expectedError = new Error('Unable to find available brokers to try'); + sandbox.stub(client, 'getAvailableBroker').yields(expectedError); + sandbox.stub(client, 'loadMetadataFrom'); + sandbox.stub(client, 'updateMetadatas'); + sandbox.stub(client, 'refreshBrokers'); + + client.on('error', function (error) { + error.should.be.an.instanceOf(Error); + error.nested.should.be.eql(expectedError); + done(); + }); + + client.refreshBrokerMetadata(); + }); + + it('should not perform refreshBrokerMetadata if one is in progress', function () { + sandbox.stub(client, 'getAvailableBroker'); + sandbox.stub(client, 'loadMetadataFrom'); + sandbox.stub(client, 'updateMetadatas'); + sandbox.stub(client, 'refreshBrokers'); + client.refreshingMetadata = true; + + client.refreshBrokerMetadata(); + + client.refreshingMetadata.should.be.true; + sinon.assert.notCalled(client.getAvailableBroker); + sinon.assert.notCalled(client.loadMetadataFrom); + sinon.assert.notCalled(client.updateMetadatas); + sinon.assert.notCalled(client.refreshBrokers); + }); + }); + describe('Verify Timeout', function () { let sandbox; From 3555a8f5daa47d23092bfb6af3863e17bd5606ba Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Thu, 25 May 2017 17:09:15 -0400 Subject: [PATCH 18/28] Add to ConsumerGroup using Kafka Client to rebalance tests --- lib/consumerGroup.js | 1 + test/helpers/child-cg-kafka-client.js | 68 +++++++++++++++++++++++++++ test/test.rebalance.js | 4 ++ 3 files changed, 73 insertions(+) create mode 100644 test/helpers/child-cg-kafka-client.js diff --git a/lib/consumerGroup.js b/lib/consumerGroup.js index 106e9a29..65bfd217 100644 --- a/lib/consumerGroup.js +++ b/lib/consumerGroup.js @@ -72,6 +72,7 @@ function ConsumerGroup (memberOptions, topics) { } if (memberOptions.kafkaHost) { + memberOptions.clientId = memberOptions.id; this.client = new KafkaClient(memberOptions); } else { this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk, diff --git a/test/helpers/child-cg-kafka-client.js b/test/helpers/child-cg-kafka-client.js new file mode 100644 index 00000000..cd4bdd74 --- /dev/null +++ b/test/helpers/child-cg-kafka-client.js @@ -0,0 +1,68 @@ +'use strict'; + +var kafka = require('../../'); +var ConsumerGroup = kafka.ConsumerGroup; +var argv = require('optimist').argv; +var topic = argv.topic || 'topic1'; +var options = { + kafkaHost: '127.0.0.1:9092', + autoCommit: true, + fetchMaxWaitMs: 1000, + fetchMaxBytes: 1024 * 1024, + sessionTimeout: 8000, + heartbeatInterval: 250, + retryMinTimeout: 250 +}; +var debug = require('debug')('kafka-node:Child-ConsumerGroup'); + +if (argv.groupId) { + options.groupId = argv.groupId; +} + +if (argv.consumerId) { + options.id = argv.consumerId; +} + +var consumer = new ConsumerGroup(options, [topic]); + +consumer.on('message', function (message) { + var out = { + id: consumer.client.clientId, + message: message + }; + process.send(out); +}); + +consumer.on('error', function (err) { + debug('error', err); +}); + +consumer.on('rebalanced', function () { + debug('%s rebalanced!', consumer.client.clientId); + sendEvent('rebalanced'); +}); + +consumer.on('rebalancing', function () { + debug('%s is rebalancing', consumer.client.clientId); +}); + +function sendEvent (event) { + process.send({ + id: consumer.client.clientId, + event: event + }); +} + +function close (signal) { + return function () { + debug('closing the consumer (%s) [%s].', signal, consumer.client.clientId); + consumer.close(true, function () { + process.exit(); + }); + }; +} + +process.once('SIGINT', close('SIGINT')); +process.once('SIGTERM', close('SIGTERM')); +process.once('SIGABRT', close('SIGABRT')); +process.once('disconnect', close('disconnect')); diff --git a/test/test.rebalance.js b/test/test.rebalance.js index 184341a9..81334318 100644 --- a/test/test.rebalance.js +++ b/test/test.rebalance.js @@ -20,6 +20,10 @@ describe('Integrated Reblance', function () { describe('ConsumerGroup', function () { testRebalance('test/helpers/child-cg', false); }); + + describe('ConsumerGroup using Kafka Client', function () { + testRebalance('test/helpers/child-cg-kafka-client', false); + }); }); function testRebalance (forkPath, checkZkTopic) { From 32f4a16a949b9a3b529958ba610aa1a4d1c77b6e Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Thu, 25 May 2017 17:09:25 -0400 Subject: [PATCH 19/28] remove console.log --- lib/kafkaClient.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index ce3514ba..5fe8a1dc 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -123,7 +123,6 @@ KafkaClient.prototype.connect = function () { logger.debug('loadMetadataForTopics after connect failed', error); return this.emit('error', error); } - console.log(result); this.updateMetadatas(result); this.emit('ready'); }); From 81d1ff23e687d7f989187dfe61bbf283aed444f5 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Thu, 25 May 2017 17:09:56 -0400 Subject: [PATCH 20/28] Call close on test Kafka Clients --- test/test.kafkaClient.js | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index dea6df27..95895e52 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -180,8 +180,14 @@ describe('Kafka Client', function () { }); describe('#connect', function () { + let client; + + afterEach(function (done) { + client.close(done); + }); + it('should connect plaintext', function (done) { - const client = new Client({ + client = new Client({ kafkaHost: 'localhost:9092' }); client.once('error', done); @@ -192,7 +198,7 @@ describe('Kafka Client', function () { }); it('should error when connecting to an invalid host', function (done) { - const client = new Client({ + client = new Client({ connectRetryOptions: { retries: 0 }, @@ -206,7 +212,7 @@ describe('Kafka Client', function () { }); it('should connect SSL', function (done) { - const client = new Client({ + client = new Client({ kafkaHost: 'localhost:9093', sslOptions: { rejectUnauthorized: false From 83eeddb77747b587745a694dffce4e633380c3b1 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Thu, 25 May 2017 17:11:58 -0400 Subject: [PATCH 21/28] formatting --- lib/client.js | 89 +++++++----- lib/consumerGroup.js | 313 ++++++++++++++++++++++++----------------- test/test.rebalance.js | 110 ++++++++++----- 3 files changed, 306 insertions(+), 206 deletions(-) diff --git a/lib/client.js b/lib/client.js index c7c9249a..806f510a 100644 --- a/lib/client.js +++ b/lib/client.js @@ -78,21 +78,19 @@ var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, util.inherits(Client, events.EventEmitter); Client.prototype.connect = function () { - var zk = this.zk = new Zookeeper(this.connectionString, this.zkOptions); + var zk = (this.zk = new Zookeeper(this.connectionString, this.zkOptions)); var self = this; zk.once('init', function (brokers) { try { self.ready = true; self.brokerMetadata = brokers; self.setupBrokerProfiles(brokers); - Object - .keys(self.brokerProfiles) - .some(function (key, index) { - var broker = self.brokerProfiles[key]; - self.setupBroker(broker.host, broker.port, false, self.brokers); - // Only connect one broker - return !index; - }); + Object.keys(self.brokerProfiles).some(function (key, index) { + var broker = self.brokerProfiles[key]; + self.setupBroker(broker.host, broker.port, false, self.brokers); + // Only connect one broker + return !index; + }); self.emit('ready'); } catch (error) { self.ready = false; @@ -238,7 +236,7 @@ Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeou var innerSet = encodeMessageSet(payload.messages); codec.encode(innerSet, function (err, message) { if (err) return cb(err); - payload.messages = [ new Message(0, attributes, '', message) ]; + payload.messages = [new Message(0, attributes, '', message)]; cb(); }); } @@ -464,11 +462,13 @@ Client.prototype.refreshBrokers = function () { function closeDeadBrokers (brokers) { var deadBrokerKeys = _.difference(Object.keys(brokers), validBrokers); if (deadBrokerKeys.length) { - self.closeBrokers(deadBrokerKeys.map(function (key) { - var broker = brokers[key]; - delete brokers[key]; - return broker; - })); + self.closeBrokers( + deadBrokerKeys.map(function (key) { + var broker = brokers[key]; + delete brokers[key]; + return broker; + }) + ); } } @@ -511,7 +511,9 @@ Client.prototype.send = function (payloads, encoder, decoder, cb) { return; } if (payloads[1].length) { - var topicNames = payloads[1].map(function (p) { return p.topic; }); + var topicNames = payloads[1].map(function (p) { + return p.topic; + }); this.loadMetadataForTopics(topicNames, function (err, resp) { if (err) { return cb(err); @@ -567,13 +569,15 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) { }; Client.prototype.checkMetadatas = function (payloads) { - if (_.isEmpty(this.topicMetadata)) return [ [], payloads ]; + if (_.isEmpty(this.topicMetadata)) return [[], payloads]; // out: [ [metadata exists], [metadata not exists] ] - var out = [ [], [] ]; - payloads.forEach(function (p) { - if (this.hasMetadata(p.topic, p.partition)) out[0].push(p); - else out[1].push(p); - }.bind(this)); + var out = [[], []]; + payloads.forEach( + function (p) { + if (this.hasMetadata(p.topic, p.partition)) out[0].push(p); + else out[1].push(p); + }.bind(this) + ); return out; }; @@ -581,7 +585,7 @@ Client.prototype.hasMetadata = function (topic, partition) { var brokerMetadata = this.brokerMetadata; var leader = this.leaderByPartition(topic, partition); - return (leader !== undefined) && brokerMetadata[leader]; + return leader !== undefined && brokerMetadata[leader]; }; Client.prototype.updateMetadatas = function (metadatas) { @@ -598,19 +602,24 @@ Client.prototype.updateMetadatas = function (metadatas) { }; Client.prototype.removeTopicMetadata = function (topics, cb) { - topics.forEach(function (t) { - if (this.topicMetadata[t]) delete this.topicMetadata[t]; - }.bind(this)); + topics.forEach( + function (t) { + if (this.topicMetadata[t]) delete this.topicMetadata[t]; + }.bind(this) + ); cb(null, topics.length); }; Client.prototype.payloadsByLeader = function (payloads) { - return payloads.reduce(function (out, p) { - var leader = this.leaderByPartition(p.topic, p.partition); - out[leader] = out[leader] || []; - out[leader].push(p); - return out; - }.bind(this), {}); + return payloads.reduce( + function (out, p) { + var leader = this.leaderByPartition(p.topic, p.partition); + out[leader] = out[leader] || []; + out[leader].push(p); + return out; + }.bind(this), + {} + ); }; Client.prototype.leaderByPartition = function (topic, partition) { @@ -633,7 +642,7 @@ Client.prototype.brokerForLeader = function (leader, longpolling) { } } - var broker = _.find(this.brokerProfiles, {id: leader}); + var broker = _.find(this.brokerProfiles, { id: leader }); if (!broker) { return; @@ -738,15 +747,19 @@ Client.prototype.handleReceivedData = function (socket) { var decoder = handlers[0]; var cb = handlers[1]; var result = decoder(resp); - (result instanceof Error) - ? cb.call(this, result) - : cb.call(this, null, result); + result instanceof Error ? cb.call(this, result) : cb.call(this, null, result); buffer.consume(size); if (socket.longpolling) socket.waiting = false; - } else { return; } + } else { + return; + } if (socket.buffer.length) { - setImmediate(function () { this.handleReceivedData(socket); }.bind(this)); + setImmediate( + function () { + this.handleReceivedData(socket); + }.bind(this) + ); } }; diff --git a/lib/consumerGroup.js b/lib/consumerGroup.js index 65bfd217..f5d14827 100644 --- a/lib/consumerGroup.js +++ b/lib/consumerGroup.js @@ -53,7 +53,7 @@ const DEFAULTS = { function ConsumerGroup (memberOptions, topics) { EventEmitter.call(this); const self = this; - this.options = _.defaults((memberOptions || {}), DEFAULTS); + this.options = _.defaults(memberOptions || {}, DEFAULTS); if (!this.options.heartbeatInterval) { this.options.heartbeatInterval = Math.floor(this.options.sessionTimeout / 3); @@ -64,19 +64,28 @@ function ConsumerGroup (memberOptions, topics) { } if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) { - throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`); + throw new Error( + `fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}` + ); } if (!(this.options.outOfRangeOffset in ACCEPTED_FROM_OFFSET)) { - throw new Error(`outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`); + throw new Error( + `outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}` + ); } if (memberOptions.kafkaHost) { memberOptions.clientId = memberOptions.id; this.client = new KafkaClient(memberOptions); } else { - this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk, - memberOptions.batch, memberOptions.ssl); + this.client = new Client( + memberOptions.host, + memberOptions.id, + memberOptions.zk, + memberOptions.batch, + memberOptions.ssl + ); } if (_.isString(topics)) { @@ -145,7 +154,12 @@ function ConsumerGroup (memberOptions, topics) { this.on('offsetOutOfRange', topic => { this.pause(); if (this.options.outOfRangeOffset === 'none') { - this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition ${topic.partition}`)); + this.emit( + 'error', + new errors.InvalidConsumerOffsetError( + `Offset out of range for topic "${topic.topic}" partition ${topic.partition}` + ) + ); return; } @@ -153,11 +167,14 @@ function ConsumerGroup (memberOptions, topics) { this.getOffset().fetch([topic], (error, result) => { if (error) { - this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error)); + this.emit( + 'error', + new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error) + ); return; } const offset = _.head(result[topic.topic][topic.partition]); - const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset; + const oldOffset = _.find(this.topicPayloads, { topic: topic.topic, partition: topic.partition }).offset; logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset); @@ -224,23 +241,26 @@ ConsumerGroup.prototype.assignPartitions = function (protocol, groupMembers, cal logger.debug('Assigning Partitions to members', groupMembers); logger.debug('Using group protocol', protocol); - protocol = _.find(this.protocols, {name: protocol}); + protocol = _.find(this.protocols, { name: protocol }); var self = this; var topics = _(groupMembers).map('subscription').flatten().uniq().value(); - async.waterfall([ - function (callback) { - logger.debug('loadingMetadata for topics:', topics); - self.client.loadMetadataForTopics(topics, callback); - }, + async.waterfall( + [ + function (callback) { + logger.debug('loadingMetadata for topics:', topics); + self.client.loadMetadataForTopics(topics, callback); + }, - function (metadataResponse, callback) { - var metadata = mapTopicToPartitions(metadataResponse[1].metadata); - logger.debug('mapTopicToPartitions', metadata); - protocol.assign(metadata, groupMembers, callback); - } - ], callback); + function (metadataResponse, callback) { + var metadata = mapTopicToPartitions(metadataResponse[1].metadata); + logger.debug('mapTopicToPartitions', metadata); + protocol.assign(metadata, groupMembers, callback); + } + ], + callback + ); }; function mapTopicToPartitions (metadata) { @@ -250,7 +270,7 @@ function mapTopicToPartitions (metadata) { ConsumerGroup.prototype.handleJoinGroup = function (joinGroupResponse, callback) { logger.debug('joinGroupResponse %j from %s', joinGroupResponse, this.client.clientId); - this.isLeader = (joinGroupResponse.leaderId === joinGroupResponse.memberId); + this.isLeader = joinGroupResponse.leaderId === joinGroupResponse.memberId; this.generationId = joinGroupResponse.generationId; this.memberId = joinGroupResponse.memberId; @@ -290,65 +310,82 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback) const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions); const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET; - async.waterfall([ - function (callback) { - self.fetchOffset(syncGroupResponse.partitions, callback); - }, - function (offsets, callback) { - logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets); + async.waterfall( + [ + function (callback) { + self.fetchOffset(syncGroupResponse.partitions, callback); + }, + function (offsets, callback) { + logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets); - var noOffset = topicPartitionList.some(function (tp) { - return offsets[tp.topic][tp.partition] === -1; - }); + var noOffset = topicPartitionList.some(function (tp) { + return offsets[tp.topic][tp.partition] === -1; + }); - if (noOffset) { - logger.debug('No saved offsets'); + if (noOffset) { + logger.debug('No saved offsets'); - if (self.options.fromOffset === 'none') { - return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self.options.groupId}'`)); - } + if (self.options.fromOffset === 'none') { + return callback( + new Error( + `${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self.options.groupId}'` + ) + ); + } - async.parallel([ - function (callback) { - if (self.migrator) { - return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback); + async.parallel( + [ + function (callback) { + if (self.migrator) { + return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback); + } + callback(null); + }, + function (callback) { + if (useDefaultOffsets) { + return self.saveDefaultOffsets(topicPartitionList, callback); + } + callback(null); + } + ], + function (error) { + if (error) { + return callback(error); + } + logger.debug( + '%s defaultOffset Response for %s: %j', + self.client.clientId, + self.options.fromOffset, + self.defaultOffsets + ); + callback(null, offsets); } - callback(null); - }, - function (callback) { - if (useDefaultOffsets) { - return self.saveDefaultOffsets(topicPartitionList, callback); + ); + } else { + logger.debug('Has saved offsets'); + callback(null, offsets); + } + }, + function (offsets, callback) { + self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) { + var offset = offsets[p.topic][p.partition]; + if (offset === -1) { + // -1 means no offset was saved for this topic/partition combo + offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0; + if (self.migrator) { + offset = self.migrator.getOffset(p, offset); } - callback(null); - } - ], function (error) { - if (error) { - return callback(error); } - logger.debug('%s defaultOffset Response for %s: %j', self.client.clientId, self.options.fromOffset, self.defaultOffsets); - callback(null, offsets); + p.offset = offset; + return p; }); - } else { - logger.debug('Has saved offsets'); - callback(null, offsets); + callback(null, true); } - }, - function (offsets, callback) { - self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) { - var offset = offsets[p.topic][p.partition]; - if (offset === -1) { // -1 means no offset was saved for this topic/partition combo - offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0; - if (self.migrator) { - offset = self.migrator.getOffset(p, offset); - } - } - p.offset = offset; - return p; - }); - callback(null, true); - } - ], callback); - } else { // no partitions assigned + ], + callback + ); + } else { + // no partitions assigned callback(null, false); } }; @@ -383,53 +420,68 @@ ConsumerGroup.prototype.connect = function () { this.connecting = true; this.emit('rebalancing'); - async.waterfall([ - function (callback) { - if (self.client.coordinatorId) { - return callback(null, null); - } - self.client.sendGroupCoordinatorRequest(self.options.groupId, callback); - }, + async.waterfall( + [ + function (callback) { + if (self.client.coordinatorId) { + return callback(null, null); + } + self.client.sendGroupCoordinatorRequest(self.options.groupId, callback); + }, - function (coordinatorInfo, callback) { - logger.debug('GroupCoordinator Response:', coordinatorInfo); - if (coordinatorInfo) { - self.setCoordinatorId(coordinatorInfo.coordinatorId); - } - self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols, callback); - }, + function (coordinatorInfo, callback) { + logger.debug('GroupCoordinator Response:', coordinatorInfo); + if (coordinatorInfo) { + self.setCoordinatorId(coordinatorInfo.coordinatorId); + } + self.client.sendJoinGroupRequest( + self.options.groupId, + emptyStrIfNull(self.memberId), + self.options.sessionTimeout, + self.protocols, + callback + ); + }, - function (joinGroupResponse, callback) { - self.handleJoinGroup(joinGroupResponse, callback); - }, + function (joinGroupResponse, callback) { + self.handleJoinGroup(joinGroupResponse, callback); + }, - function (groupAssignment, callback) { - logger.debug('SyncGroup Request from %s', self.memberId); - self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback); - }, + function (groupAssignment, callback) { + logger.debug('SyncGroup Request from %s', self.memberId); + self.client.sendSyncGroupRequest( + self.options.groupId, + self.generationId, + self.memberId, + groupAssignment, + callback + ); + }, - function (syncGroupResponse, callback) { - self.handleSyncGroup(syncGroupResponse, callback); - } - ], function (error, startFetch) { - self.connecting = false; - self.rebalancing = false; - if (error) { - return self.recovery.tryToRecoverFrom(error, 'connect'); - } + function (syncGroupResponse, callback) { + self.handleSyncGroup(syncGroupResponse, callback); + } + ], + function (error, startFetch) { + self.connecting = false; + self.rebalancing = false; + if (error) { + return self.recovery.tryToRecoverFrom(error, 'connect'); + } - self.ready = true; - self.recovery.clearError(); + self.ready = true; + self.recovery.clearError(); - logger.debug('generationId', self.generationId); + logger.debug('generationId', self.generationId); - if (startFetch) { - self.fetch(); + if (startFetch) { + self.fetch(); + } + self.startHeartbeats(); + self.emit('connect'); + self.emit('rebalanced'); } - self.startHeartbeats(); - self.emit('connect'); - self.emit('rebalanced'); - }); + ); }; ConsumerGroup.prototype.scheduleReconnect = function (timeout) { @@ -451,7 +503,7 @@ ConsumerGroup.prototype.startHeartbeats = function () { assert(this.options.sessionTimeout > 0); assert(this.ready, 'consumerGroup is not ready'); - const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3)); + const heartbeatIntervalMs = this.options.heartbeatInterval || Math.floor(this.options.sessionTimeout / 3); logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs); this.stopHeartbeats(); @@ -527,26 +579,29 @@ ConsumerGroup.prototype.close = function (force, cb) { force = false; } - async.series([ - function (callback) { - if (force) { - self.commit(true, callback); - return; - } - callback(null); - }, - function (callback) { - self.leaveGroup(function (error) { - if (error) { - logger.error('Leave group failed with', error); + async.series( + [ + function (callback) { + if (force) { + self.commit(true, callback); + return; } callback(null); - }); - }, - function (callback) { - self.client.close(callback); - } - ], cb); + }, + function (callback) { + self.leaveGroup(function (error) { + if (error) { + logger.error('Leave group failed with', error); + } + callback(null); + }); + }, + function (callback) { + self.client.close(callback); + } + ], + cb + ); }; module.exports = ConsumerGroup; diff --git a/test/test.rebalance.js b/test/test.rebalance.js index 81334318..2f030c2f 100644 --- a/test/test.rebalance.js +++ b/test/test.rebalance.js @@ -143,7 +143,14 @@ function testRebalance (forkPath, checkZkTopic) { if (consumedBy.length >= expectedConsumersConsuming) { verified(); } else { - verified(new Error('Received messages but not by the expected ' + expectedConsumersConsuming + ' consumers: ' + JSON.stringify(consumedBy))); + verified( + new Error( + 'Received messages but not by the expected ' + + expectedConsumersConsuming + + ' consumers: ' + + JSON.stringify(consumedBy) + ) + ); } } }; @@ -189,19 +196,27 @@ function testRebalance (forkPath, checkZkTopic) { sendMessages(messages, done); }); - it('verify one consumer consumes all messages on all partitions after one out of the two consumer is killed', function (done) { + it('verify one consumer consumes all messages on all partitions after one out of the two consumer is killed', function ( + done + ) { var messages = generateMessages(4, 'verify 1 c 1 killed'); var verify = getConsumerVerifier(messages, 3, 1, done); rearer.setVerifier(topic, groupId, verify); - rearer.raise(2, function () { - rearer.kill(1, function () { - sendMessages(messages, done); - }); - }, 500); + rearer.raise( + 2, + function () { + rearer.kill(1, function () { + sendMessages(messages, done); + }); + }, + 500 + ); }); - it('verify two consumer consumes all messages on all partitions after two out of the four consumers are killed right away', function (done) { + it('verify two consumer consumes all messages on all partitions after two out of the four consumers are killed right away', function ( + done + ) { var messages = generateMessages(3, 'verify 4 c 2 killed'); var verify = getConsumerVerifier(messages, 3, 2, done); @@ -213,54 +228,71 @@ function testRebalance (forkPath, checkZkTopic) { }); }); - it('verify three consumer consumes all messages on all partitions after one that is unassigned is killed', function (done) { + it('verify three consumer consumes all messages on all partitions after one that is unassigned is killed', function ( + done + ) { var messages = generateMessages(3, 'verify 2 c 2 killed'); var verify = getConsumerVerifier(messages, 3, 2, done); rearer.setVerifier(topic, groupId, verify); - async.series([ - function (callback) { - rearer.raise(3, callback); - }, - function (callback) { - setTimeout(callback, 1000); - }, - function (callback) { - rearer.raise(1, callback); - }, - function (callback) { - setTimeout(callback, 1000); - }, - function (callback) { - rearer.killFirst(callback); + async.series( + [ + function (callback) { + rearer.raise(3, callback); + }, + function (callback) { + setTimeout(callback, 1000); + }, + function (callback) { + rearer.raise(1, callback); + }, + function (callback) { + setTimeout(callback, 1000); + }, + function (callback) { + rearer.killFirst(callback); + } + ], + function () { + sendMessages(messages, done); } - ], function () { - sendMessages(messages, done); - }); + ); }); - it('verify two consumer consumes all messages on all partitions after two out of the four consumers are killed', function (done) { + it('verify two consumer consumes all messages on all partitions after two out of the four consumers are killed', function ( + done + ) { var messages = generateMessages(3, 'verify 2 c 2 killed'); var verify = getConsumerVerifier(messages, 3, 2, done); rearer.setVerifier(topic, groupId, verify); - rearer.raise(4, function () { - rearer.kill(2, function () { - sendMessages(messages, done); - }); - }, 500); + rearer.raise( + 4, + function () { + rearer.kill(2, function () { + sendMessages(messages, done); + }); + }, + 500 + ); }); - it('verify three consumer consumes all messages on all partitions after three out of the six consumers are killed', function (done) { + it('verify three consumer consumes all messages on all partitions after three out of the six consumers are killed', function ( + done + ) { var messages = generateMessages(3, 'verify 3 c 3 killed'); var verify = getConsumerVerifier(messages, 3, 2, done); rearer.setVerifier(topic, groupId, verify); - rearer.raise(6, function () { - rearer.kill(3, function () { - sendMessages(messages, done); - }); - }, 1000); + rearer.raise( + 6, + function () { + rearer.kill(3, function () { + sendMessages(messages, done); + }); + }, + 1000 + ); }); } From 9c45d3a1e96c20c5c18372db89c1f8afa8a4d673 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Thu, 25 May 2017 17:12:27 -0400 Subject: [PATCH 22/28] add hooks for refreshMetadata for KafkaClient --- lib/client.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/client.js b/lib/client.js index 806f510a..35badc68 100644 --- a/lib/client.js +++ b/lib/client.js @@ -272,6 +272,8 @@ Client.prototype.sendOffsetRequest = function (payloads, cb) { this.send(payloads, encoder, decoder, cb); }; +Client.prototype.refreshBrokerMetadata = function () {}; + Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) { requestArgs = _.values(requestArgs); var cb = requestArgs.pop(); @@ -283,6 +285,7 @@ Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) { var broker = this.brokerForLeader(this.coordinatorId); if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { + this.refreshBrokerMetadata(); return cb(new errors.BrokerNotAvailableError('Broker not available')); } @@ -528,6 +531,7 @@ Client.prototype.send = function (payloads, encoder, decoder, cb) { // check payloads again payloads = self.checkMetadatas(_payloads); if (payloads[1].length) { + this.refreshBrokerMetadata(); return cb(new errors.BrokerNotAvailableError('Could not find the leader')); } @@ -550,6 +554,7 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) { var request = encoder(this.clientId, correlationId, payloads[leader]); var broker = this.brokerForLeader(leader, longpolling); if (!broker || !broker.socket || broker.socket.error || broker.socket.closing || broker.socket.destroyed) { + this.refreshBrokerMetadata(); return cb(new errors.BrokerNotAvailableError('Could not find the leader'), payloads[leader]); } From e832d6d3878cac0941c6f51a9604565e3bc1b203 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 30 May 2017 15:33:54 -0400 Subject: [PATCH 23/28] Check instance of Client for HighLevelConsumer enforcing only original Client --- lib/highLevelConsumer.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/highLevelConsumer.js b/lib/highLevelConsumer.js index 11884494..a5fe33eb 100644 --- a/lib/highLevelConsumer.js +++ b/lib/highLevelConsumer.js @@ -9,6 +9,7 @@ var errors = require('./errors'); var retry = require('retry'); var logger = require('./logging')('kafka-node:HighLevelConsumer'); var validateConfig = require('./utils').validateConfig; +const KafkaClient = require('./kafkaClient'); var DEFAULTS = { groupId: 'kafka-node-group', @@ -36,6 +37,11 @@ var HighLevelConsumer = function (client, topics, options) { if (!topics) { throw new Error('Must have payloads'); } + + if (client instanceof KafkaClient) { + throw new Error('Client for HighLevelConsumer cannot be an instance of KafkaClient'); + } + this.fetchCount = 0; this.client = client; this.options = _.defaults((options || {}), DEFAULTS); From c952bb9d1e723c7cc586a2d189d081b0982577f9 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 30 May 2017 15:36:25 -0400 Subject: [PATCH 24/28] Allow closing the client to abort connection retries --- lib/kafkaClient.js | 30 ++++++++++++++++++++++++------ test/mocks/mockSocket.js | 3 +++ test/test.kafkaClient.js | 10 +++++++++- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 5fe8a1dc..29467ee1 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -104,6 +104,12 @@ KafkaClient.prototype.connect = function () { const connect = retry.operation(this.options.connectRetryOptions); connect.attempt(currentAttempt => { + if (this.closing) { + logger.debug('Client is closing abort retry'); + connect.stop(); + return; + } + logger.debug(`Connect attempt ${currentAttempt}`); this.connectToBrokers(this.initialHosts, error => { if (connect.retry(error)) { @@ -150,7 +156,7 @@ KafkaClient.prototype.connectToBrokers = function (hosts, callback) { callback(null); }); }, - () => !broker && index < hosts.length, + () => !this.closing && !broker && index < hosts.length, () => { if (broker) { return callback(null, broker); @@ -158,6 +164,8 @@ KafkaClient.prototype.connectToBrokers = function (hosts, callback) { if (errors.length) { callback(errors.pop()); + } else { + callback(new Error('client is closing?')); } } ); @@ -169,11 +177,17 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) { let connectTimer = null; const onError = error => { + if (socket.closing) { + return; + } clearTimeout(connectTimer); connectTimer = null; socket.closing = true; socket.end(); - delete this.brokers[`${broker.host}:${broker.port}`]; + socket.destroy(); + socket.unref(); + const brokerKey = `${broker.host}:${broker.port}`; + delete this.brokers[brokerKey]; callback(error); }; @@ -193,7 +207,7 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) { connectTimer = setTimeout(function () { logger.debug('Connection timeout error with broker %j', broker); - onError(new TimeoutError(`Connection timeout of ${timeout} exceeded`)); + onError(new TimeoutError(`Connection timeout of ${timeout}ms exceeded`)); }, timeout); }; @@ -367,11 +381,15 @@ KafkaClient.prototype.queueCallback = function (socket, id, data) { }; KafkaClient.prototype.close = function (callback) { + logger.debug('close client'); + this.closing = true; this.closeBrokers(this.brokers); this.closeBrokers(this.longpollingBrokers); - setImmediate(function () { - callback(null); - }); + if (callback) { + setImmediate(function () { + callback(null); + }); + } }; KafkaClient.prototype.createBroker = function (host, port, longpolling) { diff --git a/test/mocks/mockSocket.js b/test/mocks/mockSocket.js index 77d4aca6..c83daf47 100644 --- a/test/mocks/mockSocket.js +++ b/test/mocks/mockSocket.js @@ -4,6 +4,9 @@ var EventEmitter = require('events').EventEmitter; function FakeSocket () { EventEmitter.call(this); + this.destroy = function () {}; + this.unref = function () {}; + this.end = function () { var self = this; setImmediate(function () { diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 95895e52..de3b2667 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -355,13 +355,21 @@ describe('Kafka Client', function () { } }); + const fakeSocket = new FakeSocket(); + + sandbox.spy(fakeSocket, 'destroy'); + sandbox.spy(fakeSocket, 'end'); + sandbox.spy(fakeSocket, 'unref'); + sandbox.stub(client, 'setupBroker').returns({ - socket: new FakeSocket() + socket: fakeSocket }); client.connect(); client.once('error', function (error) { error.should.be.an.instanceOf(TimeoutError); + fakeSocket.closing.should.be.true; + sinon.assert.callOrder(fakeSocket.end, fakeSocket.destroy, fakeSocket.unref); done(); }); From eacefdc17afae0973550c42c1f016d05fd75642b Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 30 May 2017 16:03:17 -0400 Subject: [PATCH 25/28] logging metadata is too spammy. --- lib/kafkaClient.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 29467ee1..5622b760 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -318,7 +318,7 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { KafkaClient.prototype.updateMetadatas = function (metadatas) { assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); - logger.debug('updating metadatas to', metadatas); + logger.debug('updating metadatas'); this.setBrokerMetadata(metadatas[0]); Client.prototype.updateMetadatas.call(this, metadatas); }; From 0f52b0f89fe24929a79bc11cd8c8e15d45c3fd5c Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 30 May 2017 17:10:27 -0400 Subject: [PATCH 26/28] Add KafkaClient test to highLevelProducer tests --- test/test.highlevelProducer.js | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/test/test.highlevelProducer.js b/test/test.highlevelProducer.js index 921c1f14..7a0ca098 100644 --- a/test/test.highlevelProducer.js +++ b/test/test.highlevelProducer.js @@ -4,6 +4,7 @@ var kafka = require('..'); var HighLevelProducer = kafka.HighLevelProducer; var uuid = require('uuid'); var Client = kafka.Client; +var KafkaClient = kafka.KafkaClient; var KeyedMessage = kafka.KeyedMessage; const _ = require('lodash'); const assert = require('assert'); @@ -13,17 +14,29 @@ var host = process.env['KAFKA_TEST_HOST'] || ''; [ { - name: 'PLAINTEXT HighLevelProducer' + name: 'PLAINTEXT HighLevelProducer using KafkaClient', + useKafkaClient: true }, { - name: 'SSL Producer', + name: 'PLAINTEXT HighLevelProducer using Client' + }, + { + name: 'SSL Producer using Client', + sslOptions: { + rejectUnauthorized: false + }, + suiteTimeout: 30000 + }, + { + name: 'SSL Producer using KafkaClient', + useKafkaClient: true, sslOptions: { rejectUnauthorized: false }, suiteTimeout: 30000 } ].forEach(function (testParameters) { - var TOPIC_POSTFIX = '_test_' + Date.now(); + var TOPIC_POSTFIX = '_test_' + uuid.v4(); var EXISTS_TOPIC_3 = '_exists_3' + TOPIC_POSTFIX; var sslOptions = testParameters.sslOptions; @@ -36,7 +49,15 @@ var host = process.env['KAFKA_TEST_HOST'] || ''; this.timeout(suiteTimeout); } var clientId = 'kafka-node-client-' + uuid.v4(); - client = new Client(host, clientId, undefined, undefined, sslOptions); + if (testParameters.useKafkaClient) { + const kafkaHost = 'localhost:' + (sslOptions != null ? '9093' : '9092'); + client = new KafkaClient({ + kafkaHost: kafkaHost, + sslOptions: sslOptions + }); + } else { + client = new Client(host, clientId, undefined, undefined, sslOptions); + } producer = new HighLevelProducer(client); noAckProducer = new HighLevelProducer(client, { requireAcks: 0 }); producerKeyed = new HighLevelProducer(client, { partitionerType: HighLevelProducer.PARTITIONER_TYPES.keyed }); From 60773e3fc117b0903d83a3a071096ef72af94e52 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Tue, 30 May 2017 17:11:19 -0400 Subject: [PATCH 27/28] Update doc with more details about KafkaClient options. --- README.md | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index cbfc8fd3..cd20588d 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1 - [Features](#features) - [Install Kafka](#install-kafka) - [API](#api) + - [KafkaClient](#kafkaclient) - [Client](#client) - [Producer](#producer) - [HighLevelProducer](#highlevelproducer) @@ -47,6 +48,7 @@ Kafka-node is a Node.js client with Zookeeper integration for Apache Kafka 0.8.1 * Manage topic Offsets * SSL connections to brokers (Kafka 0.9+) * Consumer Groups managed by Kafka coordinator (Kafka 0.9+) +* Connect directly to brokers (Kafka 0.9+) # Install Kafka Follow the [instructions](http://kafka.apache.org/documentation.html#quickstart) on the Kafka wiki to build Kafka 0.8 and get a test broker up and running. @@ -61,13 +63,21 @@ New KafkaClient connects directly to Kafka brokers instead of connecting to zook * Kafka **ONLY** no zookeeper * Added request timeout -* Added connection timeout +* Added connection timeout and retry ### Notable differences -* Constructor accepts an options object +* Constructor accepts an single options object (see below) * Unlike the original `Client` `KafkaClient` will not emit socket errors it will do it's best to recover and only emit errors when it has exhausted it's recovery attempts * `ready` event is only emitted after successful connection to a broker and metadata request to that broker +* `Client` uses zookeeper to discover the SSL kafka host/port since we connect directly to the broker this host/port for SSL need to be correct + +### Options +* `kafkaHost` : A string of kafka broker/host combination delimited by comma for example: `kafka-1.us-east-1.myapp.com:9093,kafka-2.us-east-1.myapp.com:9093,kafka-3.us-east-1.myapp.com:9093` default: `localhost:9092`. +* `connectTimeout` : in ms it takes to wait for a successful connection before moving to the next host default: `10000` +* `requestTimeout` : in ms for a kafka request to timeout default: `30000` +* `autoConnect` : automatically connect when KafkaClient is instantiated otherwise you need to manually call `connect` default: `true` +* `connectRetryOptions` : object hash that applies to the initial connection. see [retry](https://www.npmjs.com/package/retry) module for these options. ## Client ### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions], [sslOptions]) @@ -582,7 +592,8 @@ API is very similar to `HighLevelConsumer` since it extends directly from HLC so ```js var options = { - host: 'zookeeper:2181', + host: 'zookeeper:2181', // zookeeper host omit if connecting directly to broker (see kafkaHost below) + kafkaHost: 'broker:9092', // connect directly to kafka broker (instantiates a KafkaClient) zk : undefined, // put client zk settings if you need them (see Client) batch: undefined, // put client batch settings if you need them (see Client) ssl: true, // optional (defaults to false) or tls options hash From b82ada72fa9ede4aca586ef7f222c4031564a209 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Mon, 19 Jun 2017 16:08:10 -0400 Subject: [PATCH 28/28] Override topicExists method in Kafka Client since the previous implementation used zookeeper --- lib/kafkaClient.js | 17 ++++++++++++++-- test/test.kafkaClient.js | 44 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 5622b760..d6bd2d32 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -50,7 +50,6 @@ const KafkaClient = function (options) { this.brokers = {}; this.longpollingBrokers = {}; this.topicMetadata = {}; - this.topicPartitions = {}; this.correlationId = 0; this._socketId = 0; this.cbqueue = {}; @@ -320,7 +319,7 @@ KafkaClient.prototype.updateMetadatas = function (metadatas) { assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); logger.debug('updating metadatas'); this.setBrokerMetadata(metadatas[0]); - Client.prototype.updateMetadatas.call(this, metadatas); + this.topicMetadata = metadatas[1].metadata; }; KafkaClient.prototype.brokerForLeader = function (leader, longpolling) { @@ -453,4 +452,18 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) { return new BrokerWrapper(socket, this.noAckBatchOptions); }; +KafkaClient.prototype.topicExists = function (topics, callback) { + this.loadMetadataForTopics([], (error, response) => { + if (error) { + return callback(error); + } + this.updateMetadatas(response); + const missingTopics = _.difference(topics, Object.keys(this.topicMetadata)); + if (missingTopics.length === 0) { + return callback(null); + } + callback(new errors.TopicsNotExistError(missingTopics)); + }); +}; + module.exports = KafkaClient; diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index de3b2667..5f1172de 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -4,10 +4,12 @@ const kafka = require('..'); const Client = kafka.KafkaClient; const sinon = require('sinon'); const TimeoutError = require('../lib/errors/TimeoutError'); +const TopicsNotExistError = require('../lib/errors/TopicsNotExistError'); const BrokerWrapper = require('../lib/wrapper/BrokerWrapper'); const FakeSocket = require('./mocks/mockSocket'); const should = require('should'); const _ = require('lodash'); +const uuid = require('uuid'); describe('Kafka Client', function () { describe('#parseHostList', function () { @@ -376,4 +378,46 @@ describe('Kafka Client', function () { clock.tick(10000); }); }); + + describe('#topicExists', function () { + const createTopic = require('../docker/createTopic'); + let sandbox, client; + + beforeEach(function (done) { + sandbox = sinon.sandbox.create(); + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + sandbox.restore(); + client.close(done); + }); + + it('should not yield error when single topic exists', function (done) { + const topic = uuid.v4(); + + createTopic(topic, 1, 1).then(function () { + client.topicExists([topic], done); + }); + }); + + it('should yield error when given group of topics do not exist', function (done) { + sandbox.spy(client, 'loadMetadataForTopics'); + sandbox.spy(client, 'updateMetadatas'); + + const nonExistantTopics = _.times(3, () => uuid.v4()); + + client.topicExists(nonExistantTopics, function (error) { + error.should.be.an.instanceOf(TopicsNotExistError); + sinon.assert.calledOnce(client.updateMetadatas); + sinon.assert.calledWith(client.loadMetadataForTopics, []); + sinon.assert.callOrder(client.loadMetadataForTopics, client.updateMetadatas); + error.topics.should.be.eql(nonExistantTopics); + done(); + }); + }); + }); });