diff --git a/README.md b/README.md index 6b7c8635..155c9790 100644 --- a/README.md +++ b/README.md @@ -183,29 +183,34 @@ producer.on('error', function (err) {}) ``` > ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise you may lose messages! -### createTopics(topics, async, cb) -This method is used to create topics on the Kafka server. It only works when `auto.create.topics.enable`, on the Kafka server, is set to true. Our client simply sends a metadata request to the server which will auto create topics. When `async` is set to false, this method does not return until all topics are created, otherwise it returns immediately. +### createTopics(topics, cb) +This method is used to create topics on the Kafka server. It requires Kafka 0.10+. * `topics`: **Array**, array of topics -* `async`: **Boolean**, async or sync * `cb`: **Function**, the callback Example: ``` js -var kafka = require('kafka-node'), - Producer = kafka.Producer, - client = new kafka.Client(), - producer = new Producer(client); -// Create topics sync -producer.createTopics(['t','t1'], false, function (err, data) { - console.log(data); +var kafka = require('kafka-node'); +var client = new kafka.KafkaClient(); + +var topicsToCreate = [{ + topic: 'topic1', + partitions: 1, + replicationFactor: 2 +}, +{ + topic: 'topic2', + partitions: 5, + replicationFactor: 3 +}]; + +client.createTopics(topics, (error, result) => { + // result is an array of any errors if a given topic could not be created }); -// Create topics async -producer.createTopics(['t'], true, function (err, data) {}); -producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg -``` +``` ## HighLevelProducer ### HighLevelProducer(client, [options], [customPartitioner]) diff --git a/lib/admin.js b/lib/admin.js index c368d66b..ca0eff54 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -42,4 +42,12 @@ Admin.prototype.describeGroups = function (consumerGroups, cb) { this.client.getDescribeGroups(consumerGroups, cb); }; +Admin.prototype.createTopics = function (topics, cb) { + if (!this.ready) { + this.once('ready', () => this.client.createTopics(topics, cb)); + return; + } + this.client.createTopics(topics, cb); +}; + module.exports = Admin; diff --git a/lib/errors/NotControllerError.js b/lib/errors/NotControllerError.js new file mode 100644 index 00000000..1d5ef7a4 --- /dev/null +++ b/lib/errors/NotControllerError.js @@ -0,0 +1,18 @@ +var util = require('util'); + +/** + * The request was sent to a broker that was not the controller. + * + * @param {*} message A message describing the issue. + * + * @constructor + */ +var NotController = function (message) { + Error.captureStackTrace(this, this); + this.message = message; +}; + +util.inherits(NotController, Error); +NotController.prototype.name = 'NotController'; + +module.exports = NotController; diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index d9376278..059364de 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -15,6 +15,7 @@ const BrokerWrapper = require('./wrapper/BrokerWrapper'); const errors = require('./errors'); const validateConfig = require('./utils').validateConfig; const TimeoutError = require('./errors/TimeoutError'); +const NotControllerError = require('./errors/NotControllerError'); const protocol = require('./protocol'); const protocolVersions = require('./protocol/protocolVersions'); const baseProtocolVersions = protocolVersions.baseSupport; @@ -67,6 +68,7 @@ const KafkaClient = function (options) { this._socketId = 0; this.cbqueue = {}; this.brokerMetadata = {}; + this.clusterMetadata = {}; this.ready = false; this.initialHosts = parseHostList(this.options.kafkaHost); @@ -241,6 +243,48 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) { }, timeout); }; +KafkaClient.prototype.getController = function (callback) { + // Check for cached controller + if (this.clusterMetadata.controllerId) { + var controller = this.brokerMetadata[this.clusterMetadata.controllerId]; + var broker = this.getBroker(controller.host, controller.port); + + return callback(null, broker); + } + + // If cached controller is not available, refresh metadata + this.loadMetadata((error, result) => { + if (error) { + return callback(error); + } + + // No controller will be available if api version request timed out, or if kafka version is less than 0.10. + if (!result[1].clusterMetadata || !result[1].clusterMetadata.controllerId) { + return callback(new errors.BrokerNotAvailableError('Controller broker not available')); + } + + this.updateMetadatas(result); + + var controllerId = result[1].clusterMetadata.controllerId; + var controllerMetadata = result[0][controllerId]; + + var broker = this.getBroker(controllerMetadata.host, controllerMetadata.port); + + if (!broker || !broker.isConnected()) { + return callback(new errors.BrokerNotAvailableError('Controller broker not available')); + } + + return callback(null, broker); + }); +}; + +KafkaClient.prototype.getBroker = function (host, port, longpolling) { + const brokers = this.getBrokers(); + + var addr = host + ':' + port; + return brokers[addr] || this.setupBroker(host, port, longpolling, brokers); +}; + KafkaClient.prototype.setupBroker = function (host, port, longpolling, brokers) { var brokerKey = host + ':' + port; brokers[brokerKey] = this.createBroker(host, port, longpolling); @@ -346,6 +390,22 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { } }; +KafkaClient.prototype.setClusterMetadata = function (clusterMetadata) { + assert(clusterMetadata, 'clusterMetadata is empty'); + this.clusterMetadata = clusterMetadata; +}; + +KafkaClient.prototype.setControllerId = function (controllerId) { + if (!this.clusterMetadata) { + this.clusterMetadata = { + controllerId + }; + + return; + } + this.clusterMetadata.controllerId = controllerId; +}; + KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadata) { assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); logger.debug('updating metadatas'); @@ -355,6 +415,10 @@ KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadat } else { _.extend(this.topicMetadata, metadatas[1].metadata); } + + if (metadatas[1].clusterMetadata) { + this.setClusterMetadata(metadatas[1].clusterMetadata); + } }; KafkaClient.prototype.brokerForLeader = function (leader, longpolling) { @@ -693,6 +757,74 @@ KafkaClient.prototype.clearCallbackQueue = function (socket, error) { delete this.cbqueue[socketId]; }; +/** + * Fetches metadata for brokers and cluster. + * This includes an array containing each node (id, host and port). + * Depending on kafka version, additional cluster information is available (controller id). + * @param {loadMetadataCallback} cb Function to call once metadata is loaded. + */ +KafkaClient.prototype.loadMetadata = function (callback) { + this.loadMetadataForTopics(null, callback); +}; + +/** + * Fetches metadata for brokers and cluster. + * This includes an array containing each node (id, host and port). As well as an object + * containing the topic name, partition, leader number, replica count, and in sync replicas per partition. + * Depending on kafka version, additional cluster information is available (controller id). + * @param {Array} topics List of topics to fetch metadata for. An empty array ([]) will fetch all topics. + * @param {loadMetadataCallback} callback Function to call once metadata is loaded. + */ +KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) { + var broker = this.brokerForLeader(); + + if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { + return callback(new errors.BrokerNotAvailableError('Broker not available')); + } + + const ensureBrokerReady = (broker, cb) => { + if (!broker.isReady()) { + logger.debug('missing apiSupport waiting until broker is ready...'); + this.waitUntilReady(broker, cb); + } else { + cb(null); + } + }; + + async.series([ + cb => { + ensureBrokerReady(broker, cb); + }, + cb => { + var correlationId = this.nextId(); + var supportedCoders = getSupportedForRequestType(broker, 'metadata'); + var request = supportedCoders.encoder(this.clientId, correlationId, topics); + + this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]); + broker.write(request); + } + ], (err, result) => { + callback(err, result[1]); + }); +}; + +/** + * Creates one or more topics. + * @param {Array} topics Array of topics with partition and replication factor to create. + * @param {createTopicsCallback} callback Function to call once operation is completed. + */ +KafkaClient.prototype.createTopics = function (topics, callback) { + // Calls with [string, string, ...] are forwarded to support previous versions + if (topics.every(t => typeof t === 'string')) { + return Client.prototype.createTopics.apply(this, arguments); + } + + const encoder = protocol.encodeCreateTopicRequest; + const decoder = protocol.decodeCreateTopicResponse; + + this.sendControllerRequest(encoder, decoder, [topics, this.options.requestTimeout], callback); +}; + KafkaClient.prototype.topicExists = function (topics, callback) { this.loadMetadataForTopics([], (error, response) => { if (error) { @@ -870,6 +1002,51 @@ KafkaClient.prototype.verifyPayloadsHasLeaders = function (payloads, callback) { }); }; +KafkaClient.prototype.wrapControllerCheckIfNeeded = function (encoder, decoder, encoderArgs, callback) { + if (callback.isControllerWrapper) { + return callback; + } + + var hasBeenInvoked = false; + + const wrappedCallback = (error, result) => { + if (error instanceof NotControllerError) { + this.setControllerId(null); + + if (!hasBeenInvoked) { + hasBeenInvoked = true; + this.sendControllerRequest(encoder, decoder, encoderArgs, wrappedCallback); + return; + } + } + + callback(error, result); + }; + + wrappedCallback.isControllerWrapper = true; + + return wrappedCallback; +}; + +KafkaClient.prototype.sendControllerRequest = function (encoder, decoder, encoderArgs, callback) { + this.getController((error, controller) => { + if (error) { + return callback(error); + } + + const originalArgs = _.clone(encoderArgs); + const originalCallback = callback; + const correlationId = this.nextId(); + encoderArgs.unshift(this.clientId, correlationId); + const request = encoder.apply(null, encoderArgs); + + callback = this.wrapControllerCheckIfNeeded(encoder, decoder, originalArgs, originalCallback); + + this.queueCallback(controller.socket, correlationId, [decoder, callback]); + controller.write(request); + }); +}; + KafkaClient.prototype.sendFetchRequest = function ( consumer, payloads, diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index dfcc6dc9..5bc7672a 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -249,7 +249,38 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi } function encodeMetadataRequest (clientId, correlationId, topics) { - var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata); + return _encodeMetadataRequest(clientId, correlationId, topics, 0); +} + +function decodeMetadataResponse (resp) { + return _decodeMetadataResponse(resp, 0); +} + +function encodeMetadataV1Request (clientId, correlationId, topics) { + return _encodeMetadataRequest(clientId, correlationId, topics, 1); +} + +function decodeMetadataV1Response (resp) { + return _decodeMetadataResponse(resp, 1); +} + +function _encodeMetadataRequest (clientId, correlationId, topics, version) { + var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata, version); + + // In version 0 an empty array will fetch all topics. + // In version 1+ a null value (-1) will fetch all topics. An empty array returns no topics. + // This adds support for maintaining version 0 behaviour in client regardless of kafka version ([] = fetch all topics). + if (version > 0 && ((Array.isArray(topics) && topics.length === 0) || topics === null)) { + request.Int32BE(-1); + return encodeRequestWithLength(request.make()); + } + + // Handle case where null is provided but version requested was 0 (not supported). + // Can happen if the api versions requests fails and fallback api support is used. + if (version === 0 && topics === null) { + topics = []; + } + request.Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length).string(topic); @@ -257,16 +288,25 @@ function encodeMetadataRequest (clientId, correlationId, topics) { return encodeRequestWithLength(request.make()); } -function decodeMetadataResponse (resp) { +function _decodeMetadataResponse (resp, version) { var brokers = {}; var out = {}; var topics = {}; + var controllerId = -1; var errors = []; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('brokerNum') .loop(decodeBrokers) + .tap(function (vars) { + if (version < 1) { + return; + } + + this.word32bs('controllerId'); + controllerId = vars.controllerId; + }) .word32bs('topicNum') .loop(_decodeTopics); @@ -279,6 +319,19 @@ function decodeMetadataResponse (resp) { vars.host = vars.host.toString(); }) .word32bs('port') + .tap(function (vars) { + if (version < 1) { + return; + } + + this.word16bs('rack'); + if (vars.rack === -1) { + vars.rack = ''; + } else { + this.buffer('rack', vars.rack); + vars.rack = vars.rack.toString(); + } + }) .tap(function (vars) { brokers[vars.nodeId] = { nodeId: vars.nodeId, host: vars.host, port: vars.port }; }); @@ -291,6 +344,12 @@ function decodeMetadataResponse (resp) { .tap(function (vars) { this.buffer('topic', vars.topic); vars.topic = vars.topic.toString(); + + if (version < 1) { + return; + } + + this.word8bs('isInternal'); }) .word32bs('partitionNum') .tap(function (vars) { @@ -332,9 +391,77 @@ function decodeMetadataResponse (resp) { if (!_.isEmpty(errors)) out.error = errors; out.metadata = topics; + + if (version > 0) { + out.clusterMetadata = { + controllerId + }; + } + return [brokers, out]; } +function encodeCreateTopicRequest (clientId, correlationId, topics, timeoutMs) { + var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.createTopics, 1); + request.Int32BE(topics.length); + topics.forEach(function (topic) { + request.Int16BE(topic.topic.length).string(topic.topic); + request.Int32BE(topic.partitions); + request.Int16BE(topic.replicationFactor); + request.Int32BE(0); + request.Int32BE(0); + }); + request.Int32BE(timeoutMs); + request.Int8(0); + + return encodeRequestWithLength(request.make()); +} + +function decodeCreateTopicResponse (resp) { + var topicErrorResponses = []; + var error; + + Binary.parse(resp) + .word32bs('size') + .word32bs('correlationId') + .word32bs('topicNum') + .loop(decodeTopics); + + function decodeTopics (end, vars) { + if (vars.topicNum-- === 0) return end(); + + this.word16bs('topic') + .tap(function (vars) { + this.buffer('topic', vars.topic); + vars.topic = vars.topic.toString(); + }) + .word16bs('errorCode') + .word16bs('errorMessage') + .tap(function (vars) { + if (vars.errorCode === 0) { + return; + } + + // Errors that are not related to the actual topic(s) but the entire request + // (like timeout and sending the request to a non-controller) + if (vars.errorCode === 7 || vars.errorCode === 41) { + error = createGroupError(vars.errorCode); + return; + } + + this.buffer('errorMessage', vars.errorMessage); + vars.errorMessage = vars.errorMessage.toString(); + + topicErrorResponses.push({ + topic: vars.topic, + error: vars.errorMessage + }); + }); + } + + return error || topicErrorResponses; +} + function bufferToArray (num, buffer) { var ret = []; for (var i = 0; i < num; i++) { @@ -1293,9 +1420,13 @@ exports.encodeOffsetFetchRequest = encodeOffsetFetchRequest; exports.encodeOffsetFetchV1Request = encodeOffsetFetchV1Request; exports.decodeOffsetFetchResponse = decodeOffsetFetchResponse; exports.decodeOffsetFetchV1Response = decodeOffsetFetchV1Response; - exports.encodeMetadataRequest = encodeMetadataRequest; exports.decodeMetadataResponse = decodeMetadataResponse; +exports.encodeMetadataV1Request = encodeMetadataV1Request; +exports.decodeMetadataV1Response = decodeMetadataV1Response; + +exports.encodeCreateTopicRequest = encodeCreateTopicRequest; +exports.decodeCreateTopicResponse = decodeCreateTopicResponse; exports.encodeProduceRequest = encodeProduceRequest; exports.encodeProduceV1Request = encodeProduceV1Request; diff --git a/lib/protocol/protocolVersions.js b/lib/protocol/protocolVersions.js index 475aaede..d25f3a03 100644 --- a/lib/protocol/protocolVersions.js +++ b/lib/protocol/protocolVersions.js @@ -15,7 +15,10 @@ const API_MAP = { [p.encodeFetchRequestV2, p.decodeFetchResponseV1] ], offset: [[p.encodeOffsetRequest, p.decodeOffsetResponse]], - metadata: [[p.encodeMetadataRequest, p.decodeMetadataResponse]], + metadata: [ + [p.encodeMetadataRequest, p.decodeMetadataResponse], + [p.encodeMetadataV1Request, p.decodeMetadataV1Response] + ], leader: null, stopReplica: null, updateMetadata: null, diff --git a/lib/protocol/protocol_struct.js b/lib/protocol/protocol_struct.js index c0aca9ff..0569cbc6 100644 --- a/lib/protocol/protocol_struct.js +++ b/lib/protocol/protocol_struct.js @@ -52,7 +52,8 @@ var ERROR_CODE = { '28': 'InvalidCommitOffsetSize', '29': 'TopicAuthorizationFailed', '30': 'GroupAuthorizationFailed', - '31': 'ClusterAuthorizationFailed' + '31': 'ClusterAuthorizationFailed', + '41': 'NotController' }; var GROUP_ERROR = { @@ -61,7 +62,8 @@ var GROUP_ERROR = { NotCoordinatorForGroup: require('../errors/NotCoordinatorForGroupError'), GroupLoadInProgress: require('../errors/GroupLoadInProgressError'), UnknownMemberId: require('../errors/UnknownMemberIdError'), - RebalanceInProgress: require('../errors/RebalanceInProgressError') + RebalanceInProgress: require('../errors/RebalanceInProgressError'), + NotController: require('../errors/NotControllerError') }; var REQUEST_TYPE = { diff --git a/test/mocks/mockSocket.js b/test/mocks/mockSocket.js index 442e4b7f..a6bd72d1 100644 --- a/test/mocks/mockSocket.js +++ b/test/mocks/mockSocket.js @@ -15,6 +15,7 @@ function FakeSocket () { this.close = function () {}; this.setKeepAlive = function () {}; this.destroy = function () {}; + this.write = function () {}; } util.inherits(FakeSocket, EventEmitter); diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index d4abb229..b6107a8a 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -5,6 +5,7 @@ const Client = kafka.KafkaClient; const sinon = require('sinon'); const TimeoutError = require('../lib/errors/TimeoutError'); const TopicsNotExistError = require('../lib/errors/TopicsNotExistError'); +const NotControllerError = require('../lib/errors/NotControllerError'); const BrokerWrapper = require('../lib/wrapper/BrokerWrapper'); const FakeSocket = require('./mocks/mockSocket'); const should = require('should'); @@ -839,4 +840,262 @@ describe('Kafka Client', function () { }); }); }); + + describe('#createTopics', function () { + let client; + + beforeEach(function (done) { + if (process.env.KAFKA_VERSION === '0.9') { + return this.skip(); + } + + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + client.close(done); + }); + + it('should create given topics', function (done) { + const topic1 = uuid.v4(); + const topic1ReplicationFactor = 1; + const topic1Partitions = 5; + const topic2 = uuid.v4(); + const topic2ReplicationFactor = 1; + const topic2Partitions = 1; + + client.createTopics([ + { + topic: topic1, + partitions: topic1Partitions, + replicationFactor: topic1ReplicationFactor + }, + { + topic: topic2, + partitions: topic2Partitions, + replicationFactor: topic2ReplicationFactor + } + ], (error, result) => { + should.not.exist(error); + result.should.be.empty; + + // Verify topics were properly created with partitions + replication factor by fetching metadata again + const verifyPartitions = (topicMetadata, expectedPartitionCount, expectedReplicatonfactor) => { + for (let i = 0; i < expectedPartitionCount; i++) { + topicMetadata[i].partition.should.be.exactly(i); + topicMetadata[i].replicas.length.should.be.exactly(expectedReplicatonfactor); + } + }; + + client.loadMetadataForTopics([topic1, topic2], (error, result) => { + should.not.exist(error); + verifyPartitions(result[1].metadata[topic1], topic1Partitions, topic1ReplicationFactor); + verifyPartitions(result[1].metadata[topic2], topic2Partitions, topic2ReplicationFactor); + done(); + }); + }); + }); + + it('should return topic creation errors', function (done) { + const topic = uuid.v4(); + // Only 1 broker is available under test, so a replication factor > 1 is not possible + const topicReplicationFactor = 2; + const topicPartitions = 5; + + client.createTopics([ + { + topic: topic, + partitions: topicPartitions, + replicationFactor: topicReplicationFactor + } + ], (error, result) => { + should.not.exist(error); + result.should.have.length(1); + result[0].topic.should.be.exactly(topic); + result[0].error.toLowerCase().should.startWith('replication factor: 2 larger than available brokers: 1'); + done(); + }); + }); + }); + + describe('#wrapControllerCheckIfNeeded', function () { + let client, sandbox; + + beforeEach(function (done) { + if (process.env.KAFKA_VERSION === '0.9') { + return this.skip(); + } + + sandbox = sinon.sandbox.create(); + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + sandbox.restore(); + client.close(done); + }); + + it('should not wrap again if already wrapped', function () { + const fn = _.noop; + + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fn); + const secondWrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], wrapped); + + wrapped.should.be.exactly(secondWrapped); + }); + + it('should wrap if not already wrapped', function () { + const fn = _.noop; + + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fn); + + wrapped.should.not.be.exactly(fn); + }); + + it('should set controller id to null if NotControllerError was returned once', function () { + const fn = _.noop; + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fn); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + sandbox.stub(client, 'sendControllerRequest'); + + wrapped(new NotControllerError('not controller')); + + sinon.assert.calledOnce(setControllerIdSpy); + sinon.assert.alwaysCalledWithExactly(setControllerIdSpy, null); + }); + + it('should send controller request again if NotControllerError was returned once', function () { + var encoder = () => undefined; + var decoder = () => undefined; + var args = []; + const fn = _.noop; + const wrapped = client.wrapControllerCheckIfNeeded(encoder, decoder, args, fn); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + const sendControllerRequestSpy = sandbox.stub(client, 'sendControllerRequest'); + + wrapped(new NotControllerError('not controller')); + + sinon.assert.calledOnce(setControllerIdSpy); + sinon.assert.alwaysCalledWithExactly(setControllerIdSpy, null); + sinon.assert.calledOnce(sendControllerRequestSpy); + sinon.assert.alwaysCalledWithExactly(sendControllerRequestSpy, encoder, decoder, args, wrapped); + }); + + it('should set controller id to null and call original callback if NotControllerError was returned on second try', function () { + const fnSpy = sandbox.spy(); + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fnSpy); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + sandbox.stub(client, 'sendControllerRequest'); + + wrapped(new NotControllerError('not controller')); + wrapped(new NotControllerError('not controller')); + + sinon.assert.calledTwice(setControllerIdSpy); + sinon.assert.alwaysCalledWithExactly(setControllerIdSpy, null); + sinon.assert.calledOnce(fnSpy); + }); + + it('should call original callback if another error was returned', function () { + const fnSpy = sandbox.spy(); + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fnSpy); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + + wrapped(new TimeoutError('operation timed out')); + + sinon.assert.notCalled(setControllerIdSpy); + sinon.assert.calledOnce(fnSpy); + }); + + it('should call original callback if no error was returned', function () { + const fnSpy = sandbox.spy(); + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fnSpy); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + const expectedResult = []; + + wrapped(null, expectedResult); + + sinon.assert.notCalled(setControllerIdSpy); + sinon.assert.calledOnce(fnSpy); + sinon.assert.alwaysCalledWith(fnSpy, null, expectedResult); + }); + }); + + describe('#sendControllerRequest', function () { + let client, sandbox; + + beforeEach(function (done) { + if (process.env.KAFKA_VERSION === '0.9') { + return this.skip(); + } + + sandbox = sinon.sandbox.create(); + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + sandbox.restore(); + client.close(done); + }); + + it('should wrap callback', function () { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + sandbox.stub(client, 'getController').yields(null, fakeBroker); + sandbox.stub(client, 'queueCallback'); + const wrapControllerSpy = sandbox.spy(client, 'wrapControllerCheckIfNeeded'); + const callbackSpy = sandbox.spy(); + + client.sendControllerRequest(_.noop, _.noop, [], callbackSpy); + + sinon.assert.calledOnce(wrapControllerSpy); + }); + + it('should be called twice when NotController error was returned', function () { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + sandbox.stub(client, 'getController').yields(null, fakeBroker); + sandbox.stub(client, 'queueCallback').callsFake((socket, correlationId, args) => { + args[1](new NotControllerError('not controller')); + }); + const callbackSpy = sandbox.spy(); + const sendControllerRequestSpy = sandbox.spy(client, 'sendControllerRequest'); + + client.sendControllerRequest(_.noop, _.noop, [], callbackSpy); + + sinon.assert.calledTwice(sendControllerRequestSpy); + }); + + it('should call encoder and queue callback', function () { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + sandbox.stub(client, 'getController').yields(null, fakeBroker); + const queueCallbackSpy = sandbox.stub(client, 'queueCallback'); + const encoder = sandbox.spy(); + const decoder = _.noop; + const args = []; + const callback = _.noop; + + client.sendControllerRequest(encoder, decoder, args, callback); + + sinon.assert.calledOnce(encoder); + sinon.assert.calledOnce(queueCallbackSpy); + }); + + it('should return error if controller request fails', function () { + const error = new TimeoutError('operation timed out'); + sandbox.stub(client, 'getController').yields(error); + const callbackSpy = sandbox.spy(); + + client.sendControllerRequest(null, null, null, callbackSpy); + + sinon.assert.calledOnce(callbackSpy); + sinon.assert.alwaysCalledWithExactly(callbackSpy, error); + }); + }); });