diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 554ddc222d6f48..e0c72bf1ff954c 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -250,6 +250,11 @@ readable: null end ``` + +#### readable.bytesRead + +The amount of read bytes. If `objectMode` is `true`, the value is 0 always. + #### readable.isPaused() * Return: `Boolean` diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index bdc263d6ef28f4..8cad1f1a37d37f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -83,6 +83,8 @@ function Readable(options) { this._readableState = new ReadableState(options, this); + this.bytesRead = 0; + // legacy this.readable = true; @@ -135,6 +137,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) { var e = new Error('stream.unshift() after end event'); stream.emit('error', e); } else { + stream.bytesRead += state.objectMode ? 0 : chunk.length; if (state.decoder && !addToFront && !encoding) chunk = state.decoder.write(chunk); diff --git a/lib/net.js b/lib/net.js index 0e7bb02bb57dff..1faaa5c13bfdfd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -91,7 +91,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs; // called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { self.destroyed = false; - self.bytesRead = 0; self._bytesDispatched = 0; self._sockname = null; @@ -515,10 +514,6 @@ function onread(nread, buffer) { // will prevent this from being called again until _read() gets // called again. - // if it's not enough data, we'll just call handle.readStart() - // again right away. - self.bytesRead += nread; - // Optimization: emit the original buffer with end points var ret = self.push(buffer); diff --git a/test/parallel/test-stream2-readable-bytesread.js b/test/parallel/test-stream2-readable-bytesread.js new file mode 100644 index 00000000000000..6f56af2f4e2c3e --- /dev/null +++ b/test/parallel/test-stream2-readable-bytesread.js @@ -0,0 +1,119 @@ +'use strict'; + +require('../common'); +const assert = require('assert'); +const Readable = require('stream').Readable; +const Duplex = require('stream').Duplex; +const Transform = require('stream').Transform; + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + } + }); + + readable._max = 1000; + readable._index = 1; + + var total = 0; + readable.on('data', function(chunk) { + total += chunk.length; + }); + + readable.on('end', function() { + assert.equal(total, readable.bytesRead); + }); +})(); + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + } + }); + + readable._max = 1000; + readable._index = 1; + + var total = 0; + readable.setEncoding('utf8'); + readable.on('data', function(chunk) { + total += Buffer.byteLength(chunk); + }); + + readable.on('end', function() { + assert.equal(total, readable.bytesRead); + }); +})(); + +(function() { + const duplex = new Duplex({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + }, + write: function(chunk, encoding, next) { + next(); + } + }); + + duplex._max = 1000; + duplex._index = 1; + + var total = 0; + duplex.setEncoding('utf8'); + duplex.on('data', function(chunk) { + total += Buffer.byteLength(chunk); + }); + + duplex.on('end', function() { + assert.equal(total, duplex.bytesRead); + }); +})(); + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('{"key":"value"}')); + } + }); + readable._max = 1000; + readable._index = 1; + + const transform = new Transform({ + readableObjectMode : true, + transform: function(chunk, encoding, next) { + next(null, JSON.parse(chunk)); + }, + flush: function(done) { + done(); + } + }); + + var total = 0; + readable.on('data', function(chunk) { + total += chunk.length; + }); + + transform.on('end', function() { + assert.equal(0, transform.bytesRead); + assert.equal(total, readable.bytesRead); + }); + readable.pipe(transform); +})();