From 241ffd3d490ffc4bb11a83311baa648c84efb761 Mon Sep 17 00:00:00 2001 From: Weijia Wang Date: Thu, 11 Jul 2019 23:14:49 +0800 Subject: [PATCH] stream: simplify `.pipe()` and `.unpipe()` in Readable Now we are using `pipes` and `pipesCount` in Readable state and the `pipes` value can be a stream or an array of streams. This change reducing them into one `pipes` value, which is an array of streams. It also adds a deprecation warning of `_readableState.pipesCount`. --- doc/api/deprecations.md | 14 +++++ lib/_stream_readable.js | 63 +++++-------------- ...test-stream-pipe-same-destination-twice.js | 12 ++-- .../test-stream-pipe-unpipe-streams.js | 7 +-- ...t-stream-pipescount-deprecation-warning.js | 12 ++++ test/parallel/test-stream-unpipe-event.js | 12 ++-- test/parallel/test-stream2-basic.js | 4 +- 7 files changed, 60 insertions(+), 64 deletions(-) create mode 100644 test/parallel/test-stream-pipescount-deprecation-warning.js diff --git a/doc/api/deprecations.md b/doc/api/deprecations.md index a285db698a7d79..f679e7b25bdbb5 100644 --- a/doc/api/deprecations.md +++ b/doc/api/deprecations.md @@ -2498,6 +2498,20 @@ Type: Runtime Passing a callback to [`worker.terminate()`][] is deprecated. Use the returned `Promise` instead, or a listener to the worker’s `'exit'` event. + +### DEP0133: _readableState.pipesCount + + +Type: Documentation-only + +`_readableState.pipesCount` is deprecated. Please use +`_readableState.pipes.length` instead. + [`--http-parser=legacy`]: cli.html#cli_http_parser_library [`--pending-deprecation`]: cli.html#cli_pending_deprecation [`Buffer.allocUnsafeSlow(size)`]: buffer.html#buffer_class_method_buffer_allocunsafeslow_size diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index d6db7188750ebd..554e672d57df8b 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -30,6 +30,7 @@ const EE = require('events'); const Stream = require('stream'); const { Buffer } = require('buffer'); +const internalUtil = require('internal/util'); const debug = require('internal/util/debuglog').debuglog('stream'); const BufferList = require('internal/streams/buffer_list'); const destroyImpl = require('internal/streams/destroy'); @@ -97,8 +98,7 @@ function ReadableState(options, stream, isDuplex) { // array.shift() this.buffer = new BufferList(); this.length = 0; - this.pipes = null; - this.pipesCount = 0; + this.pipes = []; this.flowing = null; this.ended = false; this.endEmitted = false; @@ -148,6 +148,13 @@ function ReadableState(options, stream, isDuplex) { } } +Object.defineProperty(ReadableState.prototype, 'pipesCount', { + get: internalUtil.deprecate(function() { + return this.pipes.length; + }, '_readableState.pipesCount is deprecated. ' + + 'Use _readableState.pipes.length instead.', 'DEP0133'), +}); + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -635,19 +642,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { const src = this; const state = this._readableState; - switch (state.pipesCount) { - case 0: - state.pipes = dest; - break; - case 1: - state.pipes = [state.pipes, dest]; - break; - default: - state.pipes.push(dest); - break; - } - state.pipesCount += 1; - debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); + state.pipes.push(dest); + debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -717,9 +713,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // to get stuck in a permanently paused state if that write // also returned false. // => Check whether `dest` is still a piping destination. - if (((state.pipesCount === 1 && state.pipes === dest) || - (state.pipesCount > 1 && state.pipes.includes(dest))) && - !cleanedUp) { + if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) { debug('false write response, pause', state.awaitDrain); state.awaitDrain++; } @@ -789,38 +783,16 @@ Readable.prototype.unpipe = function(dest) { const unpipeInfo = { hasUnpiped: false }; // If we're not piping anywhere, then do nothing. - if (state.pipesCount === 0) - return this; - - // Just one destination. most common case. - if (state.pipesCount === 1) { - // Passed in one, but it's not the right one. - if (dest && dest !== state.pipes) - return this; - - if (!dest) - dest = state.pipes; - - // got a match. - state.pipes = null; - state.pipesCount = 0; - state.flowing = false; - if (dest) - dest.emit('unpipe', this, unpipeInfo); + if (state.pipes.length === 0) return this; - } - - // Slow case with multiple pipe destinations. if (!dest) { // remove all. var dests = state.pipes; - var len = state.pipesCount; - state.pipes = null; - state.pipesCount = 0; + state.pipes = []; state.flowing = false; - for (var i = 0; i < len; i++) + for (var i = 0; i < dests.length; i++) dests[i].emit('unpipe', this, { hasUnpiped: false }); return this; } @@ -831,9 +803,8 @@ Readable.prototype.unpipe = function(dest) { return this; state.pipes.splice(index, 1); - state.pipesCount -= 1; - if (state.pipesCount === 1) - state.pipes = state.pipes[0]; + if (state.pipes.length === 0) + state.flowing = false; dest.emit('unpipe', this, unpipeInfo); diff --git a/test/parallel/test-stream-pipe-same-destination-twice.js b/test/parallel/test-stream-pipe-same-destination-twice.js index 1824c0606451a2..ff71639588ea49 100644 --- a/test/parallel/test-stream-pipe-same-destination-twice.js +++ b/test/parallel/test-stream-pipe-same-destination-twice.js @@ -20,15 +20,15 @@ const { PassThrough, Writable } = require('stream'); passThrough.pipe(dest); assert.strictEqual(passThrough._events.data.length, 2); - assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes.length, 2); assert.strictEqual(passThrough._readableState.pipes[0], dest); assert.strictEqual(passThrough._readableState.pipes[1], dest); passThrough.unpipe(dest); assert.strictEqual(passThrough._events.data.length, 1); - assert.strictEqual(passThrough._readableState.pipesCount, 1); - assert.strictEqual(passThrough._readableState.pipes, dest); + assert.strictEqual(passThrough._readableState.pipes.length, 1); + assert.deepStrictEqual(passThrough._readableState.pipes, [dest]); passThrough.write('foobar'); passThrough.pipe(dest); @@ -47,7 +47,7 @@ const { PassThrough, Writable } = require('stream'); passThrough.pipe(dest); assert.strictEqual(passThrough._events.data.length, 2); - assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes.length, 2); assert.strictEqual(passThrough._readableState.pipes[0], dest); assert.strictEqual(passThrough._readableState.pipes[1], dest); @@ -64,7 +64,7 @@ const { PassThrough, Writable } = require('stream'); passThrough.pipe(dest); assert.strictEqual(passThrough._events.data.length, 2); - assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes.length, 2); assert.strictEqual(passThrough._readableState.pipes[0], dest); assert.strictEqual(passThrough._readableState.pipes[1], dest); @@ -72,7 +72,7 @@ const { PassThrough, Writable } = require('stream'); passThrough.unpipe(dest); assert.strictEqual(passThrough._events.data, undefined); - assert.strictEqual(passThrough._readableState.pipesCount, 0); + assert.strictEqual(passThrough._readableState.pipes.length, 0); passThrough.write('foobar'); } diff --git a/test/parallel/test-stream-pipe-unpipe-streams.js b/test/parallel/test-stream-pipe-unpipe-streams.js index c8a383bc61a24b..4cb8413af22f18 100644 --- a/test/parallel/test-stream-pipe-unpipe-streams.js +++ b/test/parallel/test-stream-pipe-unpipe-streams.js @@ -22,7 +22,7 @@ assert.strictEqual(source._readableState.pipes.length, 2); source.unpipe(dest2); -assert.strictEqual(source._readableState.pipes, dest1); +assert.deepStrictEqual(source._readableState.pipes, [dest1]); assert.notStrictEqual(source._readableState.pipes, dest2); dest2.on('unpipe', common.mustNotCall()); @@ -30,7 +30,7 @@ source.unpipe(dest2); source.unpipe(dest1); -assert.strictEqual(source._readableState.pipes, null); +assert.strictEqual(source._readableState.pipes.length, 0); { // Test `cleanup()` if we unpipe all streams. @@ -43,8 +43,7 @@ assert.strictEqual(source._readableState.pipes, null); const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe']; const checkSrcCleanup = common.mustCall(() => { - assert.strictEqual(source._readableState.pipes, null); - assert.strictEqual(source._readableState.pipesCount, 0); + assert.strictEqual(source._readableState.pipes.length, 0); assert.strictEqual(source._readableState.flowing, false); srcCheckEventNames.forEach((eventName) => { diff --git a/test/parallel/test-stream-pipescount-deprecation-warning.js b/test/parallel/test-stream-pipescount-deprecation-warning.js new file mode 100644 index 00000000000000..cc2cffe833d016 --- /dev/null +++ b/test/parallel/test-stream-pipescount-deprecation-warning.js @@ -0,0 +1,12 @@ +'use strict'; + +const common = require('../common'); +const { Readable } = require('stream'); + +const warning = '_readableState.pipesCount is deprecated. ' + + 'Use _readableState.pipes.length instead.'; + +common.expectWarning('DeprecationWarning', warning, 'DEP0133'); + +const readable = new Readable(); +readable._readableState.pipesCount; diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js index 340502d1a98b7c..46cc8e8cb0ae9e 100644 --- a/test/parallel/test-stream-unpipe-event.js +++ b/test/parallel/test-stream-unpipe-event.js @@ -23,7 +23,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustCall()); src.pipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } @@ -34,7 +34,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); src.pipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 1); + assert.strictEqual(src._readableState.pipes.length, 1); }); } @@ -46,7 +46,7 @@ class NeverEndReadable extends Readable { src.pipe(dest); src.unpipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } @@ -57,7 +57,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustCall()); src.pipe(dest, { end: false }); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } @@ -68,7 +68,7 @@ class NeverEndReadable extends Readable { dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); src.pipe(dest, { end: false }); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 1); + assert.strictEqual(src._readableState.pipes.length, 1); }); } @@ -80,6 +80,6 @@ class NeverEndReadable extends Readable { src.pipe(dest, { end: false }); src.unpipe(dest); setImmediate(() => { - assert.strictEqual(src._readableState.pipesCount, 0); + assert.strictEqual(src._readableState.pipes.length, 0); }); } diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js index fa1443fd2ac3a6..094ecabd498171 100644 --- a/test/parallel/test-stream2-basic.js +++ b/test/parallel/test-stream2-basic.js @@ -171,10 +171,10 @@ class TestWriter extends EE { w[0].on('write', function() { if (--writes === 0) { r.unpipe(); - assert.strictEqual(r._readableState.pipes, null); + assert.deepStrictEqual(r._readableState.pipes, []); w[0].end(); r.pipe(w[1]); - assert.strictEqual(r._readableState.pipes, w[1]); + assert.deepStrictEqual(r._readableState.pipes, [w[1]]); } });