From 8613e074839ebacc17ae9a520a052119df905b58 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Wed, 2 Nov 2016 16:09:13 -0400 Subject: [PATCH 1/2] Fixes #504 uncaught exception trying to write to a destroyed socket --- lib/client.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/client.js b/lib/client.js index b5ed438d..b12de753 100644 --- a/lib/client.js +++ b/lib/client.js @@ -282,7 +282,7 @@ Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) { var request = encode.apply(null, requestArgs); var broker = this.brokerForLeader(this.coordinatorId); - if (!broker || !broker.socket || broker.socket.error) { + if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { return cb(new errors.BrokerNotAvailableError('Broker not available')); } @@ -342,7 +342,7 @@ Client.prototype.loadMetadataForTopics = function (topics, cb) { var request = protocol.encodeMetadataRequest(this.clientId, correlationId, topics); var broker = this.brokerForLeader(); - if (!broker || !broker.socket || broker.socket.error) { + if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { return cb(new errors.BrokerNotAvailableError('Broker not available')); } @@ -534,7 +534,7 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) { var correlationId = this.nextId(); var request = encoder(this.clientId, correlationId, payloads[leader]); var broker = this.brokerForLeader(leader, longpolling); - if (!broker || !broker.socket || broker.socket.error || broker.socket.closing) { + if (!broker || !broker.socket || broker.socket.error || broker.socket.closing || broker.socket.destroyed) { return cb(new errors.BrokerNotAvailableError('Could not find the leader'), payloads[leader]); } @@ -613,7 +613,6 @@ Client.prototype.brokerForLeader = function (leader, longpolling) { } else if (!_.isEmpty(this.brokerMetadata)) { leader = Object.keys(this.brokerMetadata)[0]; } else { - this.emit('error', new errors.BrokerNotAvailableError('Could not find a broker')); return; } } From e23d92ad0ac89bb2487f1f3c35b166aa73d5dc53 Mon Sep 17 00:00:00 2001 From: Xiaoxin Lu Date: Wed, 2 Nov 2016 17:05:57 -0400 Subject: [PATCH 2/2] catch errors emitted from Offset --- lib/consumerGroup.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/consumerGroup.js b/lib/consumerGroup.js index 68064c90..557f106f 100644 --- a/lib/consumerGroup.js +++ b/lib/consumerGroup.js @@ -2,6 +2,7 @@ const debug = require('debug')('kafka-node:ConsumerGroup'); const util = require('util'); +const EventEmitter = require('events'); const highLevelConsumer = require('./highLevelConsumer'); const Client = require('./client'); const Offset = require('./offset'); @@ -46,6 +47,7 @@ const DEFAULTS = { }; function ConsumerGroup (memberOptions, topics) { + EventEmitter.call(this); const self = this; this.options = _.defaults((memberOptions || {}), DEFAULTS); @@ -289,6 +291,8 @@ ConsumerGroup.prototype.getOffset = function () { return this.offset; } this.offset = new Offset(this.client); + // we can ignore this since we are already forwarding error event emitted from client + this.offset.on('error', _.noop); return this.offset; };