From 9a2a4464075c98a44710dac40d326f6875c4bb38 Mon Sep 17 00:00:00 2001 From: Jackson Tian Date: Fri, 18 Dec 2015 11:29:08 +0800 Subject: [PATCH] stream: add bytesRead property for readable Add a bytesRead property for readable is useful in some use cases. When user want know how many bytes read of readable, need to caculate it in userland. If encoding is specificed, get the value is very slowly. --- doc/api/stream.markdown | 5 + lib/_stream_readable.js | 3 + lib/net.js | 5 - .../test-stream2-readable-bytesread.js | 119 ++++++++++++++++++ 4 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream2-readable-bytesread.js diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 9ab35254bad21b..f242259c80c294 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -252,6 +252,11 @@ readable: null end ``` + +#### readable.bytesRead + +The amount of read bytes. If `objectMode` is `true`, the value is 0 always. + #### readable.isPaused() * Return: `Boolean` diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index bdc263d6ef28f4..8cad1f1a37d37f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -83,6 +83,8 @@ function Readable(options) { this._readableState = new ReadableState(options, this); + this.bytesRead = 0; + // legacy this.readable = true; @@ -135,6 +137,7 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) { var e = new Error('stream.unshift() after end event'); stream.emit('error', e); } else { + stream.bytesRead += state.objectMode ? 0 : chunk.length; if (state.decoder && !addToFront && !encoding) chunk = state.decoder.write(chunk); diff --git a/lib/net.js b/lib/net.js index 672e8fdc798a3a..963642e96be206 100644 --- a/lib/net.js +++ b/lib/net.js @@ -91,7 +91,6 @@ exports._normalizeConnectArgs = normalizeConnectArgs; // called when creating new Socket, or when re-using a closed Socket function initSocketHandle(self) { self.destroyed = false; - self.bytesRead = 0; self._bytesDispatched = 0; self._sockname = null; @@ -515,10 +514,6 @@ function onread(nread, buffer) { // will prevent this from being called again until _read() gets // called again. - // if it's not enough data, we'll just call handle.readStart() - // again right away. - self.bytesRead += nread; - // Optimization: emit the original buffer with end points var ret = self.push(buffer); diff --git a/test/parallel/test-stream2-readable-bytesread.js b/test/parallel/test-stream2-readable-bytesread.js new file mode 100644 index 00000000000000..6f56af2f4e2c3e --- /dev/null +++ b/test/parallel/test-stream2-readable-bytesread.js @@ -0,0 +1,119 @@ +'use strict'; + +require('../common'); +const assert = require('assert'); +const Readable = require('stream').Readable; +const Duplex = require('stream').Duplex; +const Transform = require('stream').Transform; + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + } + }); + + readable._max = 1000; + readable._index = 1; + + var total = 0; + readable.on('data', function(chunk) { + total += chunk.length; + }); + + readable.on('end', function() { + assert.equal(total, readable.bytesRead); + }); +})(); + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + } + }); + + readable._max = 1000; + readable._index = 1; + + var total = 0; + readable.setEncoding('utf8'); + readable.on('data', function(chunk) { + total += Buffer.byteLength(chunk); + }); + + readable.on('end', function() { + assert.equal(total, readable.bytesRead); + }); +})(); + +(function() { + const duplex = new Duplex({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('a')); + }, + write: function(chunk, encoding, next) { + next(); + } + }); + + duplex._max = 1000; + duplex._index = 1; + + var total = 0; + duplex.setEncoding('utf8'); + duplex.on('data', function(chunk) { + total += Buffer.byteLength(chunk); + }); + + duplex.on('end', function() { + assert.equal(total, duplex.bytesRead); + }); +})(); + +(function() { + const readable = new Readable({ + read: function(n) { + var i = this._index++; + if (i > this._max) + this.push(null); + else + this.push(new Buffer('{"key":"value"}')); + } + }); + readable._max = 1000; + readable._index = 1; + + const transform = new Transform({ + readableObjectMode : true, + transform: function(chunk, encoding, next) { + next(null, JSON.parse(chunk)); + }, + flush: function(done) { + done(); + } + }); + + var total = 0; + readable.on('data', function(chunk) { + total += chunk.length; + }); + + transform.on('end', function() { + assert.equal(0, transform.bytesRead); + assert.equal(total, readable.bytesRead); + }); + readable.pipe(transform); +})();