From 92b835d1f813585165bb8540938834819bad5166 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Sat, 28 Apr 2018 20:44:49 +0200 Subject: [PATCH 01/13] Add create topics support Adds support for sending a create topics request as a real protocol request. This supports specifying number of partitions and replication factor. Explicit replica assignments and config entries are left out for now. A create topic request must be sent to the controller. To find the current controller, this also adds a version 1 metadata request, which includes the controller Id in the response. --- lib/kafkaClient.js | 48 ++++++++++ lib/protocol/protocol.js | 151 ++++++++++++++++++++++++++++++- lib/protocol/protocolVersions.js | 10 +- test/test.kafkaClient.js | 64 +++++++++++++ 4 files changed, 269 insertions(+), 4 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index d9376278..07ffd1fd 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -241,6 +241,31 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) { }, timeout); }; +KafkaClient.prototype.getController = function (callback) { + this.loadMetadataForTopicsV2([], (error, result) => { + if (error) { + return callback(error); + } + + var controllerId = result[1].clusterMetadata.controllerId; + var controllerMetadata = result[0][controllerId]; + var broker = this.getBroker(controllerMetadata.host, controllerMetadata.port); + + if (!broker || !broker.isConnected()) { + return callback(new errors.BrokerNotAvailableError('Controller broker not available')); + } + + return callback(null, broker); + }); +}; + +KafkaClient.prototype.getBroker = function (host, port, longpolling) { + const brokers = this.getBrokers(); + + var addr = host + ':' + port; + return brokers[addr] || this.setupBroker(host, port, longpolling, brokers); +}; + KafkaClient.prototype.setupBroker = function (host, port, longpolling, brokers) { var brokerKey = host + ':' + port; brokers[brokerKey] = this.createBroker(host, port, longpolling); @@ -693,6 +718,29 @@ KafkaClient.prototype.clearCallbackQueue = function (socket, error) { delete this.cbqueue[socketId]; }; +// Sends a version 1 metadata request which includes controller node in response +KafkaClient.prototype.loadMetadataForTopicsV2 = function (topics, cb) { + var correlationId = this.nextId(); + var request = protocol.encodeMetadataV1Request(this.clientId, correlationId, topics); + var broker = this.brokerForLeader(); + + this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataV1Response, cb]); + broker.write(request); +}; + +KafkaClient.prototype.createTopicsV2 = function (topics, cb) { + var correlationId = this.nextId(); + var request = protocol.encodeCreateTopicRequest(this.clientId, correlationId, topics, this.options.requestTimeout); + this.getController((error, broker) => { + if (error) { + return cb(error); + } + + this.queueCallback(broker.socket, correlationId, [protocol.decodeCreateTopicResponse, cb]); + broker.write(request); + }); +}; + KafkaClient.prototype.topicExists = function (topics, callback) { this.loadMetadataForTopics([], (error, response) => { if (error) { diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index dfcc6dc9..d51de7cf 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -249,7 +249,23 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi } function encodeMetadataRequest (clientId, correlationId, topics) { - var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata); + return _encodeMetadataRequest(clientId, correlationId, topics); +} + +function decodeMetadataResponse (resp) { + return _decodeMetadataResponse(resp, 0); +} + +function encodeMetadataV1Request (clientId, correlationId, topics) { + return _encodeMetadataRequest(clientId, correlationId, topics, 1); +} + +function decodeMetadataV1Response (resp) { + return _decodeMetadataResponse(resp, 1); +} + +function _encodeMetadataRequest (clientId, correlationId, topics, version) { + var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata, version); request.Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length).string(topic); @@ -257,16 +273,25 @@ function encodeMetadataRequest (clientId, correlationId, topics) { return encodeRequestWithLength(request.make()); } -function decodeMetadataResponse (resp) { +function _decodeMetadataResponse (resp, version) { var brokers = {}; var out = {}; var topics = {}; + var controllerId = -1; var errors = []; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('brokerNum') .loop(decodeBrokers) + .tap(function (vars) { + if (version < 1) { + return; + } + + this.word32bs('controllerId'); + controllerId = vars.controllerId; + }) .word32bs('topicNum') .loop(_decodeTopics); @@ -279,6 +304,19 @@ function decodeMetadataResponse (resp) { vars.host = vars.host.toString(); }) .word32bs('port') + .tap(function (vars) { + if (version < 1) { + return; + } + + this.word16bs('rack'); + if (vars.rack === -1) { + vars.rack = ''; + } else { + this.buffer('rack', vars.rack); + vars.rack = vars.rack.toString(); + } + }) .tap(function (vars) { brokers[vars.nodeId] = { nodeId: vars.nodeId, host: vars.host, port: vars.port }; }); @@ -293,6 +331,13 @@ function decodeMetadataResponse (resp) { vars.topic = vars.topic.toString(); }) .word32bs('partitionNum') + .tap(function (vars) { + if (version < 1) { + return; + } + + this.word8bs('isInternal'); + }) .tap(function (vars) { if (vars.topicError !== 0) { return errors.push(ERROR_CODE[vars.topicError]); @@ -332,9 +377,103 @@ function decodeMetadataResponse (resp) { if (!_.isEmpty(errors)) out.error = errors; out.metadata = topics; + + if (version > 0) { + out.clusterMetadata = { + controllerId + }; + } + return [brokers, out]; } +function encodeCreateTopicRequest (clientId, correlationId, topics, timeoutMs) { + var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.createTopics, 1); + request.Int32BE(topics.length); + topics.forEach(function (topic) { + request.Int16BE(topic.topic.length).string(topic.topic); + request.Int32BE(topic.partitions); + request.Int16BE(topic.replicationFactor); + request.Int32BE(0); + request.Int32BE(0); + }); + request.Int32BE(timeoutMs); + request.Int8(0); + + return encodeRequestWithLength(request.make()); +} + +function decodeCreateTopicResponse (resp) { + var errors = []; + + Binary.parse(resp) + .word32bs('size') + .word32bs('correlationId') + .word32bs('topicNum') + .loop(decodeTopics); + + function decodeTopics (end, vars) { + if (vars.topicNum-- === 0) return end(); + + this.word16bs('topic') + .tap(function (vars) { + this.buffer('topic', vars.topic); + vars.topic = vars.topic.toString(); + }) + .word16bs('errorCode') + .word16bs('errorMessage') + .tap(function (vars) { + if (vars.errorCode === 0) { + return; + } + + const TIMEOUT_ERROR_CODE = 7; + if (vars.errorCode === TIMEOUT_ERROR_CODE) { + vars.errorMessage = 'Received timeout error from broker. The topic may still have been created.'; + } else { + this.buffer('errorMessage', vars.errorMessage); + vars.errorMessage = vars.errorMessage.toString(); + } + + errors.push({ + topic: vars.topic, + error: vars.errorMessage + }); + }); + } + + return errors; +} + +function encodeDeleteTopicsRequest (clientId, correlationId, topics) { + var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.deleteTopics); + request.Int32BE(topics.length); + topics.forEach(function (topic) { + request.Int16BE(topic.length).string(topic); + }); + request.Int32BE(200); + + return encodeRequestWithLength(request.make()); +} + +function decodeDeleteTopicsResponse (resp) { + var res = {}; + Binary.parse(resp) + .word32bs('size') + .word32bs('correlationId') + .word32bs('topicNum') + .word16bs('topic') + .tap(function (vars) { + this.buffer('topic', vars.topic); + vars.topic = vars.topic.toString(); + }) + .word16bs('errorCode') + .tap((vars) => { + console.log(vars); + }); + return res; +} + function bufferToArray (num, buffer) { var ret = []; for (var i = 0; i < num; i++) { @@ -1293,9 +1432,15 @@ exports.encodeOffsetFetchRequest = encodeOffsetFetchRequest; exports.encodeOffsetFetchV1Request = encodeOffsetFetchV1Request; exports.decodeOffsetFetchResponse = decodeOffsetFetchResponse; exports.decodeOffsetFetchV1Response = decodeOffsetFetchV1Response; - exports.encodeMetadataRequest = encodeMetadataRequest; exports.decodeMetadataResponse = decodeMetadataResponse; +exports.encodeMetadataV1Request = encodeMetadataV1Request; +exports.decodeMetadataV1Response = decodeMetadataV1Response; + +exports.encodeCreateTopicRequest = encodeCreateTopicRequest; +exports.decodeCreateTopicResponse = decodeCreateTopicResponse; +exports.encodeDeleteTopicsRequest = encodeDeleteTopicsRequest; +exports.decodeDeleteTopicsResponse = decodeDeleteTopicsResponse; exports.encodeProduceRequest = encodeProduceRequest; exports.encodeProduceV1Request = encodeProduceV1Request; diff --git a/lib/protocol/protocolVersions.js b/lib/protocol/protocolVersions.js index 475aaede..057cc054 100644 --- a/lib/protocol/protocolVersions.js +++ b/lib/protocol/protocolVersions.js @@ -15,7 +15,10 @@ const API_MAP = { [p.encodeFetchRequestV2, p.decodeFetchResponseV1] ], offset: [[p.encodeOffsetRequest, p.decodeOffsetResponse]], - metadata: [[p.encodeMetadataRequest, p.decodeMetadataResponse]], + metadata: [ + [p.encodeMetadataRequest, p.decodeMetadataResponse], + [p.encodeMetadataV1Request, p.decodeMetadataV1Response] + ], leader: null, stopReplica: null, updateMetadata: null, @@ -45,6 +48,11 @@ const API_MAP = { // Since versions API isn't around until 0.10 we need to hardcode the supported API versions for 0.9 here const API_SUPPORTED_IN_KAFKA_0_9 = { + metadata: { + min: 0, + max: 1, + usable: 1 + }, fetch: { min: 0, max: 1, diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index d4abb229..6e6bdfe6 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -839,4 +839,68 @@ describe('Kafka Client', function () { }); }); }); + + describe('#createTopicsV2', function () { + let sandbox, client; + + beforeEach(function (done) { + sandbox = sinon.sandbox.create(); + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + sandbox.restore(); + client.close(done); + }); + + it('should create given topics', function (done) { + const topic1 = uuid.v4(); + const topic1ReplicationFactor = 1; + const topic1Partitions = 5; + const topic2 = uuid.v4(); + const topic2ReplicationFactor = 1; + const topic2Partitions = 1; + + client.createTopicsV2([ + { + topic: topic1, + partitions: topic1Partitions, + replicationFactor: topic1ReplicationFactor + }, + { + topic: topic2, + partitions: topic2Partitions, + replicationFactor: topic2ReplicationFactor + } + ], (error, result) => { + should.not.exist(error); + result.should.be.empty; + done(); + }); + }); + + it('should return topic creation errors', function (done) { + const topic = uuid.v4(); + // Only 1 broker is available under test, so a replication factor > 1 is not possible + const topicReplicationFactor = 2; + const topicPartitions = 5; + + client.createTopicsV2([ + { + topic: topic, + partitions: topicPartitions, + replicationFactor: topicReplicationFactor + } + ], (error, result) => { + should.not.exist(error); + result.should.have.length(1); + result[0].topic.should.be.exactly(topic); + result[0].error.should.be.exactly('Replication factor: 2 larger than available brokers: 1.'); + done(); + }); + }); + }); }); From b7e53c5c874c73649e18a8bb060257a456b411b3 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Sat, 28 Apr 2018 21:30:26 +0200 Subject: [PATCH 02/13] Handle different error message in kafka versions Error message changes a bit depending on version. This relaxes the assertion a bit. --- test/test.kafkaClient.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 6e6bdfe6..1c124fc0 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -898,7 +898,7 @@ describe('Kafka Client', function () { should.not.exist(error); result.should.have.length(1); result[0].topic.should.be.exactly(topic); - result[0].error.should.be.exactly('Replication factor: 2 larger than available brokers: 1.'); + result[0].error.toLowerCase().should.startWith('replication factor: 2 larger than available brokers: 1'); done(); }); }); From de03d2d7b13f57c4c863cdabe1205d9d55ec3e1e Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 30 Apr 2018 19:42:29 +0200 Subject: [PATCH 03/13] Remove metadata api support version metadata v1 is not supported until 0.10 --- lib/protocol/protocolVersions.js | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/protocol/protocolVersions.js b/lib/protocol/protocolVersions.js index 057cc054..d25f3a03 100644 --- a/lib/protocol/protocolVersions.js +++ b/lib/protocol/protocolVersions.js @@ -48,11 +48,6 @@ const API_MAP = { // Since versions API isn't around until 0.10 we need to hardcode the supported API versions for 0.9 here const API_SUPPORTED_IN_KAFKA_0_9 = { - metadata: { - min: 0, - max: 1, - usable: 1 - }, fetch: { min: 0, max: 1, From 1c46c1b28187abe48e2ab42b92c69cffbbc499f7 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 30 Apr 2018 19:43:57 +0200 Subject: [PATCH 04/13] Return error if controller cannot be found Happens when cluster is is running version less than 0.10 --- lib/kafkaClient.js | 6 ++++++ test/test.kafkaClient.js | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 07ffd1fd..03dba345 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -249,6 +249,12 @@ KafkaClient.prototype.getController = function (callback) { var controllerId = result[1].clusterMetadata.controllerId; var controllerMetadata = result[0][controllerId]; + + if (!controllerMetadata) { + logger.debug('No controller found, likely because version is less than 0.10'); + return callback(new errors.BrokerNotAvailableError('Controller broker not available')); + } + var broker = this.getBroker(controllerMetadata.host, controllerMetadata.port); if (!broker || !broker.isConnected()) { diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 1c124fc0..b7fb8c27 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -844,6 +844,10 @@ describe('Kafka Client', function () { let sandbox, client; beforeEach(function (done) { + if (process.env.KAFKA_VERSION === '0.9') { + this.skip(); + } + sandbox = sinon.sandbox.create(); client = new Client({ kafkaHost: 'localhost:9092' From d91091b232d4d0a1903f57886dcd664ecb7f08f7 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Sat, 5 May 2018 18:02:24 +0200 Subject: [PATCH 05/13] Parse partition metadata correctly for v1 The order resulted in isInternal being used as the number of partitions during decoding. --- lib/protocol/protocol.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index d51de7cf..dd468c3c 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -329,15 +329,14 @@ function _decodeMetadataResponse (resp, version) { .tap(function (vars) { this.buffer('topic', vars.topic); vars.topic = vars.topic.toString(); - }) - .word32bs('partitionNum') - .tap(function (vars) { + if (version < 1) { return; } this.word8bs('isInternal'); }) + .word32bs('partitionNum') .tap(function (vars) { if (vars.topicError !== 0) { return errors.push(ERROR_CODE[vars.topicError]); From d90407509102aea2c641be522ecd83508d52faab Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 21 May 2018 09:54:17 +0200 Subject: [PATCH 06/13] Address PR feedback - Add createTopics alias on admin client - Remove V2 suffix, replacing old versions of createTopics and loadMetadataForTopics on KafkaClient - Add loadMetadata function - Use supported API to resolve metadata request version --- lib/admin.js | 8 +++ lib/kafkaClient.js | 105 ++++++++++++++++++++++++++------ lib/protocol/protocol.js | 69 +++++++++------------ lib/protocol/protocol_struct.js | 3 +- test/test.kafkaClient.js | 6 +- 5 files changed, 129 insertions(+), 62 deletions(-) diff --git a/lib/admin.js b/lib/admin.js index c368d66b..ca0eff54 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -42,4 +42,12 @@ Admin.prototype.describeGroups = function (consumerGroups, cb) { this.client.getDescribeGroups(consumerGroups, cb); }; +Admin.prototype.createTopics = function (topics, cb) { + if (!this.ready) { + this.once('ready', () => this.client.createTopics(topics, cb)); + return; + } + this.client.createTopics(topics, cb); +}; + module.exports = Admin; diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 03dba345..9c1d9aa4 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -67,6 +67,7 @@ const KafkaClient = function (options) { this._socketId = 0; this.cbqueue = {}; this.brokerMetadata = {}; + this.clusterMetadata = {}; this.ready = false; this.initialHosts = parseHostList(this.options.kafkaHost); @@ -242,19 +243,28 @@ KafkaClient.prototype.connectToBroker = function (broker, callback) { }; KafkaClient.prototype.getController = function (callback) { - this.loadMetadataForTopicsV2([], (error, result) => { + // Check for cached controller + if (this.clusterMetadata.controllerId) { + var controller = this.brokerMetadata[this.clusterMetadata.controllerId]; + var broker = this.getBroker(controller.host, controller.port); + + return callback(null, broker); + } + + // If cached controller is not available, refresh metadata + this.loadMetadata((error, result) => { if (error) { return callback(error); } - var controllerId = result[1].clusterMetadata.controllerId; - var controllerMetadata = result[0][controllerId]; - - if (!controllerMetadata) { - logger.debug('No controller found, likely because version is less than 0.10'); + // No controller will be available if api version request timed out, or if kafka version is less than 0.10. + if (!result[1].clusterMetadata || !result[1].clusterMetadata.controllerId) { return callback(new errors.BrokerNotAvailableError('Controller broker not available')); } + var controllerId = result[1].clusterMetadata.controllerId; + var controllerMetadata = result[0][controllerId]; + var broker = this.getBroker(controllerMetadata.host, controllerMetadata.port); if (!broker || !broker.isConnected()) { @@ -377,6 +387,11 @@ KafkaClient.prototype.setBrokerMetadata = function (brokerMetadata) { } }; +KafkaClient.prototype.setClusterMetadata = function (clusterMetadata) { + assert(clusterMetadata, 'clusterMetadata is empty'); + this.clusterMetadata = clusterMetadata; +}; + KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadata) { assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); logger.debug('updating metadatas'); @@ -386,6 +401,10 @@ KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadat } else { _.extend(this.topicMetadata, metadatas[1].metadata); } + + if (metadatas[1].clusterMetadata) { + this.setClusterMetadata(metadatas[1].clusterMetadata); + } }; KafkaClient.prototype.brokerForLeader = function (leader, longpolling) { @@ -724,25 +743,77 @@ KafkaClient.prototype.clearCallbackQueue = function (socket, error) { delete this.cbqueue[socketId]; }; -// Sends a version 1 metadata request which includes controller node in response -KafkaClient.prototype.loadMetadataForTopicsV2 = function (topics, cb) { - var correlationId = this.nextId(); - var request = protocol.encodeMetadataV1Request(this.clientId, correlationId, topics); +/** + * Fetches metadata for brokers and cluster. + * This includes an array containing each node (id, host and port). + * Depending on kafka version, additional cluster information is available (controller id). + * @param {loadMetadataCallback} cb Function to call once metadata is loaded. + */ +KafkaClient.prototype.loadMetadata = function (callback) { + this.loadMetadataForTopics(null, callback); +}; + +/** + * Fetches metadata for brokers and cluster. + * This includes an array containing each node (id, host and port). As well as an object + * containing the topic name, partition, leader number, replica count, and in sync replicas per partition. + * Depending on kafka version, additional cluster information is available (controller id). + * @param {Array} topics List of topics to fetch metadata for. An empty array ([]) will fetch all topics. + * @param {loadMetadataCallback} callback Function to call once metadata is loaded. + */ +KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) { var broker = this.brokerForLeader(); - this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataV1Response, cb]); - broker.write(request); + if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { + return callback(new errors.BrokerNotAvailableError('Broker not available')); + } + + const ensureBrokerReady = (broker, cb) => { + if (!broker.isReady()) { + logger.debug('missing apiSupport waiting until broker is ready...'); + this.waitUntilReady(broker, cb); + } else { + cb(null); + } + }; + + async.series([ + cb => { + ensureBrokerReady(broker, cb); + }, + cb => { + var correlationId = this.nextId(); + var supportedCoders = getSupportedForRequestType(broker, 'metadata'); + var request = supportedCoders.encoder(this.clientId, correlationId, topics); + + this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]); + broker.write(request); + } + ], (err, result) => { + if (!err) { + this.updateMetadatas(result[1]); + } + + callback(err, result[1]); + }); }; -KafkaClient.prototype.createTopicsV2 = function (topics, cb) { - var correlationId = this.nextId(); - var request = protocol.encodeCreateTopicRequest(this.clientId, correlationId, topics, this.options.requestTimeout); +/** + * Creates one or more topics. + * Requires kafka 0.10 or greater. + * @param {Array} topics Array of topics with partition and replication factor to create. + * @param {createTopicsCallback} callback Function to call once operation is completed. + */ +KafkaClient.prototype.createTopics = function (topics, callback) { this.getController((error, broker) => { if (error) { - return cb(error); + return callback(error); } - this.queueCallback(broker.socket, correlationId, [protocol.decodeCreateTopicResponse, cb]); + const correlationId = this.nextId(); + const request = protocol.encodeCreateTopicRequest(this.clientId, correlationId, topics, this.options.requestTimeout); + + this.queueCallback(broker.socket, correlationId, [protocol.decodeCreateTopicResponse, callback]); broker.write(request); }); }; diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index dd468c3c..42e93f14 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -249,7 +249,7 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi } function encodeMetadataRequest (clientId, correlationId, topics) { - return _encodeMetadataRequest(clientId, correlationId, topics); + return _encodeMetadataRequest(clientId, correlationId, topics, 0); } function decodeMetadataResponse (resp) { @@ -266,6 +266,21 @@ function decodeMetadataV1Response (resp) { function _encodeMetadataRequest (clientId, correlationId, topics, version) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata, version); + + // In version 0 an empty array will fetch all topics. + // In version 1+ a null value (-1) will fetch all topics. An empty array returns no topics. + // This adds support for maintaining version 0 behaviour in client regardless of kafka version ([] = fetch all topics). + if (version > 0 && (Array.isArray(topics) && topics.length === 0)) { + request.Int32BE(-1); + return encodeRequestWithLength(request.make()); + } + + // Handle case where null is provided but version requested was 0 (not supported). + // Can happen if the api versions requests fails and fallback api support is used. + if (version === 0 && topics === null) { + topics = []; + } + request.Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length).string(topic); @@ -403,7 +418,8 @@ function encodeCreateTopicRequest (clientId, correlationId, topics, timeoutMs) { } function decodeCreateTopicResponse (resp) { - var errors = []; + var topicErrorResponses = []; + var error; Binary.parse(resp) .word32bs('size') @@ -426,51 +442,24 @@ function decodeCreateTopicResponse (resp) { return; } - const TIMEOUT_ERROR_CODE = 7; - if (vars.errorCode === TIMEOUT_ERROR_CODE) { - vars.errorMessage = 'Received timeout error from broker. The topic may still have been created.'; - } else { - this.buffer('errorMessage', vars.errorMessage); - vars.errorMessage = vars.errorMessage.toString(); + // Errors that are not related to the actual topic(s) but the entire request + // (like timeout and sending the request to a non-controller) + if (vars.errorCode === 7 || vars.errorCode === 41) { + error = createGroupError(vars.errorCode); + return; } - errors.push({ + this.buffer('errorMessage', vars.errorMessage); + vars.errorMessage = vars.errorMessage.toString(); + + topicErrorResponses.push({ topic: vars.topic, error: vars.errorMessage }); }); } - return errors; -} - -function encodeDeleteTopicsRequest (clientId, correlationId, topics) { - var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.deleteTopics); - request.Int32BE(topics.length); - topics.forEach(function (topic) { - request.Int16BE(topic.length).string(topic); - }); - request.Int32BE(200); - - return encodeRequestWithLength(request.make()); -} - -function decodeDeleteTopicsResponse (resp) { - var res = {}; - Binary.parse(resp) - .word32bs('size') - .word32bs('correlationId') - .word32bs('topicNum') - .word16bs('topic') - .tap(function (vars) { - this.buffer('topic', vars.topic); - vars.topic = vars.topic.toString(); - }) - .word16bs('errorCode') - .tap((vars) => { - console.log(vars); - }); - return res; + return error || topicErrorResponses; } function bufferToArray (num, buffer) { @@ -1438,8 +1427,6 @@ exports.decodeMetadataV1Response = decodeMetadataV1Response; exports.encodeCreateTopicRequest = encodeCreateTopicRequest; exports.decodeCreateTopicResponse = decodeCreateTopicResponse; -exports.encodeDeleteTopicsRequest = encodeDeleteTopicsRequest; -exports.decodeDeleteTopicsResponse = decodeDeleteTopicsResponse; exports.encodeProduceRequest = encodeProduceRequest; exports.encodeProduceV1Request = encodeProduceV1Request; diff --git a/lib/protocol/protocol_struct.js b/lib/protocol/protocol_struct.js index c0aca9ff..9371f0c4 100644 --- a/lib/protocol/protocol_struct.js +++ b/lib/protocol/protocol_struct.js @@ -52,7 +52,8 @@ var ERROR_CODE = { '28': 'InvalidCommitOffsetSize', '29': 'TopicAuthorizationFailed', '30': 'GroupAuthorizationFailed', - '31': 'ClusterAuthorizationFailed' + '31': 'ClusterAuthorizationFailed', + '41': 'NotController' }; var GROUP_ERROR = { diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index b7fb8c27..20e70606 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -840,7 +840,7 @@ describe('Kafka Client', function () { }); }); - describe('#createTopicsV2', function () { + describe('#createTopics', function () { let sandbox, client; beforeEach(function (done) { @@ -868,7 +868,7 @@ describe('Kafka Client', function () { const topic2ReplicationFactor = 1; const topic2Partitions = 1; - client.createTopicsV2([ + client.createTopics([ { topic: topic1, partitions: topic1Partitions, @@ -892,7 +892,7 @@ describe('Kafka Client', function () { const topicReplicationFactor = 2; const topicPartitions = 5; - client.createTopicsV2([ + client.createTopics([ { topic: topic, partitions: topicPartitions, From 038c903610b40b4d5b2001c84af5586213f0f8c1 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 21 May 2018 13:05:02 +0200 Subject: [PATCH 07/13] Encode null value properly --- lib/protocol/protocol.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index 42e93f14..5bc7672a 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -270,7 +270,7 @@ function _encodeMetadataRequest (clientId, correlationId, topics, version) { // In version 0 an empty array will fetch all topics. // In version 1+ a null value (-1) will fetch all topics. An empty array returns no topics. // This adds support for maintaining version 0 behaviour in client regardless of kafka version ([] = fetch all topics). - if (version > 0 && (Array.isArray(topics) && topics.length === 0)) { + if (version > 0 && ((Array.isArray(topics) && topics.length === 0) || topics === null)) { request.Int32BE(-1); return encodeRequestWithLength(request.make()); } From 2916d70898ca174e221e2ba0974a1bab29dc65e0 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 21 May 2018 13:05:29 +0200 Subject: [PATCH 08/13] Forward old createTopics calls to previous impl --- lib/kafkaClient.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 9c1d9aa4..7916ddb7 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -790,21 +790,21 @@ KafkaClient.prototype.loadMetadataForTopics = function (topics, callback) { broker.write(request); } ], (err, result) => { - if (!err) { - this.updateMetadatas(result[1]); - } - callback(err, result[1]); }); }; /** * Creates one or more topics. - * Requires kafka 0.10 or greater. * @param {Array} topics Array of topics with partition and replication factor to create. * @param {createTopicsCallback} callback Function to call once operation is completed. */ KafkaClient.prototype.createTopics = function (topics, callback) { + // Calls with [string, string, ...] are forwarded to support previous versions + if (topics.every(t => typeof t === 'string')) { + return Client.prototype.createTopics.apply(this, arguments); + } + this.getController((error, broker) => { if (error) { return callback(error); From 0ec81c84fcf0a1a61f90e60c07ea91f4602c0e84 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 21 May 2018 13:24:02 +0200 Subject: [PATCH 09/13] Update README with new createTopics function --- README.md | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 6b7c8635..155c9790 100644 --- a/README.md +++ b/README.md @@ -183,29 +183,34 @@ producer.on('error', function (err) {}) ``` > ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise you may lose messages! -### createTopics(topics, async, cb) -This method is used to create topics on the Kafka server. It only works when `auto.create.topics.enable`, on the Kafka server, is set to true. Our client simply sends a metadata request to the server which will auto create topics. When `async` is set to false, this method does not return until all topics are created, otherwise it returns immediately. +### createTopics(topics, cb) +This method is used to create topics on the Kafka server. It requires Kafka 0.10+. * `topics`: **Array**, array of topics -* `async`: **Boolean**, async or sync * `cb`: **Function**, the callback Example: ``` js -var kafka = require('kafka-node'), - Producer = kafka.Producer, - client = new kafka.Client(), - producer = new Producer(client); -// Create topics sync -producer.createTopics(['t','t1'], false, function (err, data) { - console.log(data); +var kafka = require('kafka-node'); +var client = new kafka.KafkaClient(); + +var topicsToCreate = [{ + topic: 'topic1', + partitions: 1, + replicationFactor: 2 +}, +{ + topic: 'topic2', + partitions: 5, + replicationFactor: 3 +}]; + +client.createTopics(topics, (error, result) => { + // result is an array of any errors if a given topic could not be created }); -// Create topics async -producer.createTopics(['t'], true, function (err, data) {}); -producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg -``` +``` ## HighLevelProducer ### HighLevelProducer(client, [options], [customPartitioner]) From 59052d5c947035413531f8189330eb2320dae3c0 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Mon, 21 May 2018 14:09:26 +0200 Subject: [PATCH 10/13] Update cached metadata in getController --- lib/kafkaClient.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 7916ddb7..6ac7a09c 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -262,6 +262,8 @@ KafkaClient.prototype.getController = function (callback) { return callback(new errors.BrokerNotAvailableError('Controller broker not available')); } + this.updateMetadatas(result); + var controllerId = result[1].clusterMetadata.controllerId; var controllerMetadata = result[0][controllerId]; From d039f44f5e73bf7cec7ecebf4daa0f6cf7dedd79 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Tue, 22 May 2018 19:10:20 +0200 Subject: [PATCH 11/13] Remove unused sandbox --- test/test.kafkaClient.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 20e70606..6afcff83 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -841,14 +841,13 @@ describe('Kafka Client', function () { }); describe('#createTopics', function () { - let sandbox, client; + let client; beforeEach(function (done) { if (process.env.KAFKA_VERSION === '0.9') { this.skip(); } - sandbox = sinon.sandbox.create(); client = new Client({ kafkaHost: 'localhost:9092' }); @@ -856,7 +855,6 @@ describe('Kafka Client', function () { }); afterEach(function (done) { - sandbox.restore(); client.close(done); }); From 911e3e20c245557e9e69f3e754feeb0a4b758b54 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Tue, 22 May 2018 19:32:47 +0200 Subject: [PATCH 12/13] Add additional assertion of createTopic test Now fetches metadata again to verify topics were created with requested partition count and replication factor. --- test/test.kafkaClient.js | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 6afcff83..7ae8da9b 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -880,7 +880,21 @@ describe('Kafka Client', function () { ], (error, result) => { should.not.exist(error); result.should.be.empty; - done(); + + // Verify topics were properly created with partitions + replication factor by fetching metadata again + const verifyPartitions = (topicMetadata, expectedPartitionCount, expectedReplicatonfactor) => { + for (let i = 0; i < expectedPartitionCount; i++) { + topicMetadata[i].partition.should.be.exactly(i); + topicMetadata[i].replicas.length.should.be.exactly(expectedReplicatonfactor); + } + }; + + client.loadMetadataForTopics([topic1, topic2], (error, result) => { + should.not.exist(error); + verifyPartitions(result[1].metadata[topic1], topic1Partitions, topic1ReplicationFactor); + verifyPartitions(result[1].metadata[topic2], topic2Partitions, topic2ReplicationFactor); + done(); + }); }); }); From ac6d07e0d423d308d866bd1f29a680d2744ead1e Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Sun, 27 May 2018 13:34:46 +0200 Subject: [PATCH 13/13] Add sendControllerRequest method Wraps the provided callback with retry logic if NotController error was returned. In this case the cached controller is cleared and it is fetched again by getting the latest metadata. --- lib/errors/NotControllerError.js | 18 +++ lib/kafkaClient.js | 70 ++++++++++-- lib/protocol/protocol_struct.js | 3 +- test/mocks/mockSocket.js | 1 + test/test.kafkaClient.js | 181 ++++++++++++++++++++++++++++++- 5 files changed, 261 insertions(+), 12 deletions(-) create mode 100644 lib/errors/NotControllerError.js diff --git a/lib/errors/NotControllerError.js b/lib/errors/NotControllerError.js new file mode 100644 index 00000000..1d5ef7a4 --- /dev/null +++ b/lib/errors/NotControllerError.js @@ -0,0 +1,18 @@ +var util = require('util'); + +/** + * The request was sent to a broker that was not the controller. + * + * @param {*} message A message describing the issue. + * + * @constructor + */ +var NotController = function (message) { + Error.captureStackTrace(this, this); + this.message = message; +}; + +util.inherits(NotController, Error); +NotController.prototype.name = 'NotController'; + +module.exports = NotController; diff --git a/lib/kafkaClient.js b/lib/kafkaClient.js index 6ac7a09c..059364de 100644 --- a/lib/kafkaClient.js +++ b/lib/kafkaClient.js @@ -15,6 +15,7 @@ const BrokerWrapper = require('./wrapper/BrokerWrapper'); const errors = require('./errors'); const validateConfig = require('./utils').validateConfig; const TimeoutError = require('./errors/TimeoutError'); +const NotControllerError = require('./errors/NotControllerError'); const protocol = require('./protocol'); const protocolVersions = require('./protocol/protocolVersions'); const baseProtocolVersions = protocolVersions.baseSupport; @@ -394,6 +395,17 @@ KafkaClient.prototype.setClusterMetadata = function (clusterMetadata) { this.clusterMetadata = clusterMetadata; }; +KafkaClient.prototype.setControllerId = function (controllerId) { + if (!this.clusterMetadata) { + this.clusterMetadata = { + controllerId + }; + + return; + } + this.clusterMetadata.controllerId = controllerId; +}; + KafkaClient.prototype.updateMetadatas = function (metadatas, replaceTopicMetadata) { assert(metadatas && Array.isArray(metadatas) && metadatas.length === 2, 'metadata format is incorrect'); logger.debug('updating metadatas'); @@ -807,17 +819,10 @@ KafkaClient.prototype.createTopics = function (topics, callback) { return Client.prototype.createTopics.apply(this, arguments); } - this.getController((error, broker) => { - if (error) { - return callback(error); - } - - const correlationId = this.nextId(); - const request = protocol.encodeCreateTopicRequest(this.clientId, correlationId, topics, this.options.requestTimeout); + const encoder = protocol.encodeCreateTopicRequest; + const decoder = protocol.decodeCreateTopicResponse; - this.queueCallback(broker.socket, correlationId, [protocol.decodeCreateTopicResponse, callback]); - broker.write(request); - }); + this.sendControllerRequest(encoder, decoder, [topics, this.options.requestTimeout], callback); }; KafkaClient.prototype.topicExists = function (topics, callback) { @@ -997,6 +1002,51 @@ KafkaClient.prototype.verifyPayloadsHasLeaders = function (payloads, callback) { }); }; +KafkaClient.prototype.wrapControllerCheckIfNeeded = function (encoder, decoder, encoderArgs, callback) { + if (callback.isControllerWrapper) { + return callback; + } + + var hasBeenInvoked = false; + + const wrappedCallback = (error, result) => { + if (error instanceof NotControllerError) { + this.setControllerId(null); + + if (!hasBeenInvoked) { + hasBeenInvoked = true; + this.sendControllerRequest(encoder, decoder, encoderArgs, wrappedCallback); + return; + } + } + + callback(error, result); + }; + + wrappedCallback.isControllerWrapper = true; + + return wrappedCallback; +}; + +KafkaClient.prototype.sendControllerRequest = function (encoder, decoder, encoderArgs, callback) { + this.getController((error, controller) => { + if (error) { + return callback(error); + } + + const originalArgs = _.clone(encoderArgs); + const originalCallback = callback; + const correlationId = this.nextId(); + encoderArgs.unshift(this.clientId, correlationId); + const request = encoder.apply(null, encoderArgs); + + callback = this.wrapControllerCheckIfNeeded(encoder, decoder, originalArgs, originalCallback); + + this.queueCallback(controller.socket, correlationId, [decoder, callback]); + controller.write(request); + }); +}; + KafkaClient.prototype.sendFetchRequest = function ( consumer, payloads, diff --git a/lib/protocol/protocol_struct.js b/lib/protocol/protocol_struct.js index 9371f0c4..0569cbc6 100644 --- a/lib/protocol/protocol_struct.js +++ b/lib/protocol/protocol_struct.js @@ -62,7 +62,8 @@ var GROUP_ERROR = { NotCoordinatorForGroup: require('../errors/NotCoordinatorForGroupError'), GroupLoadInProgress: require('../errors/GroupLoadInProgressError'), UnknownMemberId: require('../errors/UnknownMemberIdError'), - RebalanceInProgress: require('../errors/RebalanceInProgressError') + RebalanceInProgress: require('../errors/RebalanceInProgressError'), + NotController: require('../errors/NotControllerError') }; var REQUEST_TYPE = { diff --git a/test/mocks/mockSocket.js b/test/mocks/mockSocket.js index 442e4b7f..a6bd72d1 100644 --- a/test/mocks/mockSocket.js +++ b/test/mocks/mockSocket.js @@ -15,6 +15,7 @@ function FakeSocket () { this.close = function () {}; this.setKeepAlive = function () {}; this.destroy = function () {}; + this.write = function () {}; } util.inherits(FakeSocket, EventEmitter); diff --git a/test/test.kafkaClient.js b/test/test.kafkaClient.js index 7ae8da9b..b6107a8a 100644 --- a/test/test.kafkaClient.js +++ b/test/test.kafkaClient.js @@ -5,6 +5,7 @@ const Client = kafka.KafkaClient; const sinon = require('sinon'); const TimeoutError = require('../lib/errors/TimeoutError'); const TopicsNotExistError = require('../lib/errors/TopicsNotExistError'); +const NotControllerError = require('../lib/errors/NotControllerError'); const BrokerWrapper = require('../lib/wrapper/BrokerWrapper'); const FakeSocket = require('./mocks/mockSocket'); const should = require('should'); @@ -845,7 +846,7 @@ describe('Kafka Client', function () { beforeEach(function (done) { if (process.env.KAFKA_VERSION === '0.9') { - this.skip(); + return this.skip(); } client = new Client({ @@ -919,4 +920,182 @@ describe('Kafka Client', function () { }); }); }); + + describe('#wrapControllerCheckIfNeeded', function () { + let client, sandbox; + + beforeEach(function (done) { + if (process.env.KAFKA_VERSION === '0.9') { + return this.skip(); + } + + sandbox = sinon.sandbox.create(); + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + sandbox.restore(); + client.close(done); + }); + + it('should not wrap again if already wrapped', function () { + const fn = _.noop; + + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fn); + const secondWrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], wrapped); + + wrapped.should.be.exactly(secondWrapped); + }); + + it('should wrap if not already wrapped', function () { + const fn = _.noop; + + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fn); + + wrapped.should.not.be.exactly(fn); + }); + + it('should set controller id to null if NotControllerError was returned once', function () { + const fn = _.noop; + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fn); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + sandbox.stub(client, 'sendControllerRequest'); + + wrapped(new NotControllerError('not controller')); + + sinon.assert.calledOnce(setControllerIdSpy); + sinon.assert.alwaysCalledWithExactly(setControllerIdSpy, null); + }); + + it('should send controller request again if NotControllerError was returned once', function () { + var encoder = () => undefined; + var decoder = () => undefined; + var args = []; + const fn = _.noop; + const wrapped = client.wrapControllerCheckIfNeeded(encoder, decoder, args, fn); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + const sendControllerRequestSpy = sandbox.stub(client, 'sendControllerRequest'); + + wrapped(new NotControllerError('not controller')); + + sinon.assert.calledOnce(setControllerIdSpy); + sinon.assert.alwaysCalledWithExactly(setControllerIdSpy, null); + sinon.assert.calledOnce(sendControllerRequestSpy); + sinon.assert.alwaysCalledWithExactly(sendControllerRequestSpy, encoder, decoder, args, wrapped); + }); + + it('should set controller id to null and call original callback if NotControllerError was returned on second try', function () { + const fnSpy = sandbox.spy(); + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fnSpy); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + sandbox.stub(client, 'sendControllerRequest'); + + wrapped(new NotControllerError('not controller')); + wrapped(new NotControllerError('not controller')); + + sinon.assert.calledTwice(setControllerIdSpy); + sinon.assert.alwaysCalledWithExactly(setControllerIdSpy, null); + sinon.assert.calledOnce(fnSpy); + }); + + it('should call original callback if another error was returned', function () { + const fnSpy = sandbox.spy(); + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fnSpy); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + + wrapped(new TimeoutError('operation timed out')); + + sinon.assert.notCalled(setControllerIdSpy); + sinon.assert.calledOnce(fnSpy); + }); + + it('should call original callback if no error was returned', function () { + const fnSpy = sandbox.spy(); + const wrapped = client.wrapControllerCheckIfNeeded(_.noop, _.noop, [], fnSpy); + const setControllerIdSpy = sandbox.spy(client, 'setControllerId'); + const expectedResult = []; + + wrapped(null, expectedResult); + + sinon.assert.notCalled(setControllerIdSpy); + sinon.assert.calledOnce(fnSpy); + sinon.assert.alwaysCalledWith(fnSpy, null, expectedResult); + }); + }); + + describe('#sendControllerRequest', function () { + let client, sandbox; + + beforeEach(function (done) { + if (process.env.KAFKA_VERSION === '0.9') { + return this.skip(); + } + + sandbox = sinon.sandbox.create(); + client = new Client({ + kafkaHost: 'localhost:9092' + }); + client.once('ready', done); + }); + + afterEach(function (done) { + sandbox.restore(); + client.close(done); + }); + + it('should wrap callback', function () { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + sandbox.stub(client, 'getController').yields(null, fakeBroker); + sandbox.stub(client, 'queueCallback'); + const wrapControllerSpy = sandbox.spy(client, 'wrapControllerCheckIfNeeded'); + const callbackSpy = sandbox.spy(); + + client.sendControllerRequest(_.noop, _.noop, [], callbackSpy); + + sinon.assert.calledOnce(wrapControllerSpy); + }); + + it('should be called twice when NotController error was returned', function () { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + sandbox.stub(client, 'getController').yields(null, fakeBroker); + sandbox.stub(client, 'queueCallback').callsFake((socket, correlationId, args) => { + args[1](new NotControllerError('not controller')); + }); + const callbackSpy = sandbox.spy(); + const sendControllerRequestSpy = sandbox.spy(client, 'sendControllerRequest'); + + client.sendControllerRequest(_.noop, _.noop, [], callbackSpy); + + sinon.assert.calledTwice(sendControllerRequestSpy); + }); + + it('should call encoder and queue callback', function () { + const fakeBroker = new BrokerWrapper(new FakeSocket()); + sandbox.stub(client, 'getController').yields(null, fakeBroker); + const queueCallbackSpy = sandbox.stub(client, 'queueCallback'); + const encoder = sandbox.spy(); + const decoder = _.noop; + const args = []; + const callback = _.noop; + + client.sendControllerRequest(encoder, decoder, args, callback); + + sinon.assert.calledOnce(encoder); + sinon.assert.calledOnce(queueCallbackSpy); + }); + + it('should return error if controller request fails', function () { + const error = new TimeoutError('operation timed out'); + sandbox.stub(client, 'getController').yields(error); + const callbackSpy = sandbox.spy(); + + client.sendControllerRequest(null, null, null, callbackSpy); + + sinon.assert.calledOnce(callbackSpy); + sinon.assert.alwaysCalledWithExactly(callbackSpy, error); + }); + }); });