Skip to content

Commit

Permalink
Merge pull request #262 from carlessistare/batch-async
Browse files Browse the repository at this point in the history
Buffer batch for async producers
  • Loading branch information
haio committed Jan 8, 2016
2 parents 9405c86 + c314a79 commit 4d51e95
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 9 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ Follow the [instructions](http://kafka.apache.org/documentation.html#quickstart)

# API
## Client
### Client(connectionString, clientId, [zkOptions])
### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions])
* `connectionString`: Zookeeper connection string, default `localhost:2181/`
* `clientId`: This is a user-supplied identifier for the client application, default `kafka-node-client`
* `zkOptions`: **Object**, Zookeeper options, see [node-zookeeper-client](https://github.com/alexguan/node-zookeeper-client#client-createclientconnectionstring-options)
* `noAckBatchOptions`: **Object**, when requireAcks is disabled on Producer side we can define the batch properties, 'noAckBatchSize' in bytes and 'noAckBatchAge' in milliseconds. The default value is `{ noAckBatchSize: 500, noAckBatchAge: 300 }`

### close(cb)
Closes the connection to Zookeeper and the brokers so that the node process can exit gracefully.
Expand Down
55 changes: 55 additions & 0 deletions lib/batch/KafkaBuffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict';

var KafkaBuffer = function (batch_size, batch_age) {

this._batch_size = batch_size;
this._batch_age = batch_age;
this._batch_age_timer = null;
this._buffer = null;

}

KafkaBuffer.prototype.addChunk = function (buffer , callback) {

if (this._buffer == null) {
this._buffer = new Buffer(buffer);
} else {
this._buffer = Buffer.concat([this._buffer, buffer]);
}

if (typeof callback !== "undefined" && callback != null) {
if (this._batch_size == null || this._batch_age == null ||
(this._buffer && (this._buffer.length > this._batch_size))) {
callback();
} else {
this._setupTimer(callback);
}
}

}

KafkaBuffer.prototype._setupTimer = function (callback) {

var self = this;

if (this._batch_age_timer != null) {
clearTimeout(this._batch_age_timer);
}

this._batch_age_timer = setTimeout( function() {
if(self._buffer && (self._buffer.length > 0)) {
callback();
}
}, this._batch_age);

}

KafkaBuffer.prototype.getBatch = function () {
return this._buffer;
}

KafkaBuffer.prototype.truncateBatch = function () {
this._buffer = null;
}

module.exports = KafkaBuffer;
22 changes: 14 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var net = require('net'),
Binary = require('binary'),
getCodec = require('./codec'),
protocol = require('./protocol'),
BrokerWrapper = require('./wrapper/BrokerWrapper'),
encodeMessageSet = protocol.encodeMessageSet,
Message = protocol.Message,
zk = require('./zookeeper'),
Expand All @@ -30,16 +31,21 @@ var net = require('net'),
* @param {String} [clientId='kafka-node-client'] The client id to register with zookeeper, helpful for debugging
* @param {Object} zkOptions Pass through options to the zookeeper client library
*
* @param {Object} noAckBatchOptions Batch buffer options for no ACK requirement producers
* - noAckBatchOptions.noAckBatchSize Max batch size in bytes for the buffer before sending all data to broker
* - noAckBatchOptions.noAckBatchAge Timeout max for the buffer to retain data before sending all data to broker
*
* @constructor
*/
var Client = function (connectionString, clientId, zkOptions) {
var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions) {
if (this instanceof Client === false) {
return new Client(connectionString, clientId);
}

this.connectionString = connectionString || 'localhost:2181/';
this.clientId = clientId || 'kafka-node-client';
this.zkOptions = zkOptions;
this.noAckBatchOptions = noAckBatchOptions;
this.brokers = {};
this.longpollingBrokers = {};
this.topicMetadata = {};
Expand Down Expand Up @@ -99,8 +105,8 @@ Client.prototype.close = function (cb) {

Client.prototype.closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
broker.closing = true;
broker.end();
broker.socket.closing = true;
broker.socket.end();
});
};

Expand Down Expand Up @@ -225,11 +231,11 @@ Client.prototype.loadMetadataForTopics = function (topics, cb) {
var request = protocol.encodeMetadataRequest(this.clientId, correlationId, topics);
var broker = this.brokerForLeader();

if (!broker || broker.error) {
if (!broker.socket || broker.socket.error) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}

this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]);
this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataResponse, cb]);
broker.write(request);
};

Expand Down Expand Up @@ -417,10 +423,10 @@ Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
}

if (decoder.requireAcks == 0) {
broker.write(request);
broker.writeAsync(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker, correlationId, [decoder, cb]);
this.queueCallback(broker.socket, correlationId, [decoder, cb]);
broker.write(request);
}
}
Expand Down Expand Up @@ -547,7 +553,7 @@ Client.prototype.createBroker = function connect(host, port, longpolling) {
s.connect(s.port, s.host);
}, 1000);
}
return socket;
return new BrokerWrapper(socket, this.noAckBatchOptions);
};

Client.prototype.handleReceivedData = function (socket) {
Expand Down
14 changes: 14 additions & 0 deletions lib/wrapper/BrokerReadable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict';

var util = require('util'),
Readable = require('stream').Readable;

var BrokerReadable = function (options) {
Readable.call(this, options);
};

util.inherits(BrokerReadable, Readable);

BrokerReadable.prototype._read = function (size) {};

module.exports = BrokerReadable;
26 changes: 26 additions & 0 deletions lib/wrapper/BrokerTransform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

var util = require('util'),
Transform = require('stream').Transform,
KafkaBuffer = require('../batch/KafkaBuffer');

var BrokerTransform = function (options) {
Transform.call(this, options);
this.noAckBatchSize = options ? options.noAckBatchSize : null;
this.noAckBatchAge = options ? options.noAckBatchAge : null;
this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge);
};

util.inherits(BrokerTransform, Transform);

BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this))
done();
};

BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
}

module.exports = BrokerTransform;
39 changes: 39 additions & 0 deletions lib/wrapper/BrokerWrapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

var BrokerReadable = require('./BrokerReadable'),
BrokerTransform = require('./BrokerTransform');

var BrokerWrapper = function (socket, noAckBatchOptions) {

this.socket = socket;

var self = this,
readable = new BrokerReadable(),
transform = new BrokerTransform(noAckBatchOptions);

readable.pipe(transform);

transform.on('readable', function () {
var bulkMessage = null;
while (bulkMessage = transform.read()) {
self.socket.write(bulkMessage);
}
});

this.readableSocket = readable

}

BrokerWrapper.prototype.write = function (buffer) {

this.socket.write(buffer);

}

BrokerWrapper.prototype.writeAsync = function (buffer) {

this.readableSocket.push(buffer);

}

module.exports = BrokerWrapper;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"devDependencies": {
"mocha": "^2.2.1",
"should": "^5.2.0",
"sinon": "~1.17.2",
"optimist": "^0.6.1"
},
"repository": {
Expand Down
134 changes: 134 additions & 0 deletions test/test.producerBatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
'use strict';

var sinon = require('sinon'),
kafka = require('..'),
Producer = kafka.Producer,
Client = kafka.Client;

var client, producer, batchClient, batchProducer, noAckProducer;

var TOPIC_POSTFIX = '_test_' + Date.now();
var EXISTS_TOPIC_4 = '_exists_4' + TOPIC_POSTFIX;
var BATCH_SIZE = 500;
var BATCH_AGE = 300;

var host = process.env['KAFKA_TEST_HOST'] || '';

before(function (done) {
client = new Client(host);
batchClient = new Client(host, null, null, { noAckBatchSize: BATCH_SIZE, noAckBatchAge: BATCH_AGE });
producer = new Producer(client);
batchProducer = new Producer(batchClient);
producer.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_4], false, function (err, created) {
if(err) return done(err);
setTimeout(done, 500);
});
});
});

describe('No Ack Producer', function () {

before(function(done) {
// Ensure that first message gets the `0`
producer.send([{ topic: EXISTS_TOPIC_4, messages: '_initial 1' }], function (err, message) {
message.should.be.ok;
message[EXISTS_TOPIC_4].should.have.property('0', 0);
batchProducer.send([{ topic: EXISTS_TOPIC_4, messages: '_initial 2' }], function (err, message) {
message.should.be.ok;
message[EXISTS_TOPIC_4].should.have.property('0', 1);
done(err);
});
});
});

describe('with no batch client', function () {

before(function(done) {
noAckProducer = new Producer(client, { requireAcks: 0 });
done();
});

beforeEach(function() {
this.sendSpy = sinon.spy(client.brokers[host + ":9092"].socket, 'write');
});

afterEach(function() {
this.sendSpy.restore();
});

it('should send message directly', function (done) {
var self = this;
noAckProducer.send([{
topic: EXISTS_TOPIC_4, messages: 'hello kafka no batch'
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
self.sendSpy.args.length.should.be.equal(1);
self.sendSpy.args[0].toString().should.containEql('hello kafka no batch');
done();
});
});
});

describe('with batch client', function () {

before(function(done) {
noAckProducer = new Producer(batchClient, { requireAcks: 0 });
done();
});

beforeEach(function() {
this.sendSpy = sinon.spy(batchClient.brokers[host + ":9092"].socket, 'write');
this.clock = sinon.useFakeTimers();
});

afterEach(function() {
this.sendSpy.restore();
this.clock.restore();
});

it('should wait to send message 500 ms', function (done) {
var self = this;
noAckProducer.send([{
topic: EXISTS_TOPIC_4, messages: 'hello kafka with batch'
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
self.sendSpy.args.length.should.be.equal(0);
self.clock.tick(BATCH_AGE - 5);
self.sendSpy.args.length.should.be.equal(0);
self.clock.tick(10);
self.sendSpy.args.length.should.be.equal(1);
self.sendSpy.args[0].toString().should.containEql('hello kafka with batch');
done();
});
});

it('should send message once the batch max size is reached', function (done) {
var self = this;
var foo = "";
for (var i = 0; i < BATCH_SIZE; i++) foo += "X"
foo += "end of message"
noAckProducer.send([{
topic: EXISTS_TOPIC_4, messages: 'hello kafka with batch'
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
self.sendSpy.args.length.should.be.equal(0);
noAckProducer.send([{
topic: EXISTS_TOPIC_4, messages: foo
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
self.sendSpy.args.length.should.be.equal(1);
self.sendSpy.args[0].toString().should.containEql('hello kafka with batch');
self.sendSpy.args[0].toString().should.containEql('end of message');
done();
});
});
});

});

});

0 comments on commit 4d51e95

Please sign in to comment.