Skip to content

Commit

Permalink
Revert "Merge pull request #654 from crzidea/bl"
Browse files Browse the repository at this point in the history
This reverts commit 2ac4153, reversing
changes made to 88abf37.
  • Loading branch information
hyperlink committed May 8, 2017
1 parent ba76d1d commit 728674f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 17 deletions.
20 changes: 10 additions & 10 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var async = require('async');
var retry = require('retry');
var events = require('events');
var errors = require('./errors');
var Binary = require('binary');
var getCodec = require('./codec');
var protocol = require('./protocol');
var BrokerWrapper = require('./wrapper/BrokerWrapper');
Expand All @@ -20,7 +21,6 @@ var url = require('url');
var logger = require('./logging')('kafka-node:Client');
var validateConfig = require('./utils').validateConfig;
var validateKafkaTopics = require('./utils').validateTopicNames;
var BufferList = require('bl');

const MAX_INT32 = 2147483647;

Expand Down Expand Up @@ -694,10 +694,10 @@ Client.prototype.createBroker = function (host, port, longpolling) {
socket.on('end', function () {
retry(this);
});
socket.buffer = new BufferList();
socket.buffer = new Buffer([]);
socket.on('data', function (data) {
socket.buffer.append(data);
self.handleReceivedData(socket);
this.buffer = Buffer.concat([this.buffer, data]);
self.handleReceivedData(this);
});
socket.setKeepAlive(true, 60000);

Expand All @@ -723,12 +723,12 @@ Client.prototype.reconnectBroker = function (oldSocket) {
};

Client.prototype.handleReceivedData = function (socket) {
var buffer = socket.buffer;
var size = buffer.readUInt32BE(0) + 4;
var vars = Binary.parse(socket.buffer).word32bu('size').word32bu('correlationId').vars;
var size = vars.size + 4;
var correlationId = vars.correlationId;

if (buffer.length >= size) {
var resp = buffer.shallowSlice(0, size);
var correlationId = resp.readUInt32BE(4);
if (socket.buffer.length >= size) {
var resp = socket.buffer.slice(0, size);
var handlers = this.unqueueCallback(socket, correlationId);

if (!handlers) return;
Expand All @@ -738,7 +738,7 @@ Client.prototype.handleReceivedData = function (socket) {
(result instanceof Error)
? cb.call(this, result)
: cb.call(this, null, result);
buffer.consume(size);
socket.buffer = socket.buffer.slice(size);
if (socket.longpolling) socket.waiting = false;
} else { return; }

Expand Down
7 changes: 1 addition & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@
"producer",
"broker"
],
"files": [
"kafka.js",
"logging.js",
"lib"
],
"files": ["kafka.js", "logging.js", "lib"],
"bugs": "https://github.com/SOHU-co/kafka-node/issues",
"version": "1.6.1",
"main": "kafka.js",
"license": "MIT",
"dependencies": {
"async": ">0.9 <2.0",
"binary": "~0.3.0",
"bl": "^1.2.0",
"buffer-crc32": "~0.2.5",
"buffermaker": "~1.2.0",
"debug": "^2.1.3",
Expand Down
1 change: 0 additions & 1 deletion start-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ if [ -z "$TRAVIS" ]; then
DOCKER_VM_IP=`dlite ip`
fi

DOCKER_VM_IP=${DOCKER_VM_IP:-127.0.0.1}
export KAFKA_ADVERTISED_HOST_NAME=$DOCKER_VM_IP
docker-compose down
docker-compose up -d
Expand Down

0 comments on commit 728674f

Please sign in to comment.