From 4e3f6f355b892d047e7e5a8ef5a65fb189e47f89 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 4 Jul 2020 18:55:50 +0200 Subject: [PATCH] stream: cleanup and fix Readable.wrap Cleans up Readable.wrap and also ensures destroy is called for certain events. PR-URL: https://github.com/nodejs/node/pull/34204 Reviewed-By: Anna Henningsen Reviewed-By: Luigi Pinca Reviewed-By: James M Snell --- lib/_stream_readable.js | 71 +++++-------------- .../test-stream2-readable-wrap-destroy.js | 27 +++++++ test/parallel/test-stream2-readable-wrap.js | 10 +++ 3 files changed, 56 insertions(+), 52 deletions(-) create mode 100644 test/parallel/test-stream2-readable-wrap-destroy.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 8ddb9562a41af2..cab1f4d8785028 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -26,6 +26,7 @@ const { NumberIsInteger, NumberIsNaN, ObjectDefineProperties, + ObjectKeys, ObjectSetPrototypeOf, Set, SymbolAsyncIterator, @@ -1007,83 +1008,49 @@ function flow(stream) { // This is *not* part of the readable stream interface. // It is an ugly unfortunate mess of history. Readable.prototype.wrap = function(stream) { - const state = this._readableState; let paused = false; - stream.on('end', () => { - debug('wrapped end'); - if (state.decoder && !state.ended) { - const chunk = state.decoder.end(); - if (chunk && chunk.length) - this.push(chunk); - } - - this.push(null); - }); + // TODO (ronag): Should this.destroy(err) emit + // 'error' on the wrapped stream? Would require + // a static factory method, e.g. Readable.wrap(stream). stream.on('data', (chunk) => { - debug('wrapped data'); - if (state.decoder) - chunk = state.decoder.write(chunk); - - // Don't skip over falsy values in objectMode. - if (state.objectMode && (chunk === null || chunk === undefined)) - return; - else if (!state.objectMode && (!chunk || !chunk.length)) - return; - - const ret = this.push(chunk); - if (!ret) { + if (!this.push(chunk) && stream.pause) { paused = true; stream.pause(); } }); - // Proxy all the other methods. Important when wrapping filters and duplexes. - for (const i in stream) { - if (this[i] === undefined && typeof stream[i] === 'function') { - this[i] = function methodWrap(method) { - return function methodWrapReturnFunction() { - return stream[method].apply(stream, arguments); - }; - }(i); - } - } + stream.on('end', () => { + this.push(null); + }); stream.on('error', (err) => { errorOrDestroy(this, err); }); stream.on('close', () => { - // TODO(ronag): Update readable state? - this.emit('close'); + this.destroy(); }); stream.on('destroy', () => { - // TODO(ronag): this.destroy()? - this.emit('destroy'); + this.destroy(); }); - stream.on('pause', () => { - // TODO(ronag): this.pause()? - this.emit('pause'); - }); - - stream.on('resume', () => { - // TODO(ronag): this.resume()? - this.emit('resume'); - }); - - // When we try to consume some more bytes, simply unpause the - // underlying stream. - this._read = (n) => { - debug('wrapped _read', n); - if (paused) { + this._read = () => { + if (paused && stream.resume) { paused = false; stream.resume(); } }; + // Proxy all the other methods. Important when wrapping filters and duplexes. + for (const i of ObjectKeys(stream)) { + if (this[i] === undefined && typeof stream[i] === 'function') { + this[i] = stream[i].bind(stream); + } + } + return this; }; diff --git a/test/parallel/test-stream2-readable-wrap-destroy.js b/test/parallel/test-stream2-readable-wrap-destroy.js new file mode 100644 index 00000000000000..b0f4714c741202 --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-destroy.js @@ -0,0 +1,27 @@ +'use strict'; +const common = require('../common'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + new Readable({ + autoDestroy: false, + destroy: common.mustCall() + }) + .wrap(oldStream); + oldStream.emit('destroy'); +} + +{ + new Readable({ + autoDestroy: false, + destroy: common.mustCall() + }) + .wrap(oldStream); + oldStream.emit('close'); +} diff --git a/test/parallel/test-stream2-readable-wrap.js b/test/parallel/test-stream2-readable-wrap.js index 0c9cb5861d936e..69f055fd7e535e 100644 --- a/test/parallel/test-stream2-readable-wrap.js +++ b/test/parallel/test-stream2-readable-wrap.js @@ -44,6 +44,16 @@ function runTest(highWaterMark, objectMode, produce) { flow(); }; + // Make sure pause is only emitted once. + let pausing = false; + r.on('pause', () => { + assert.strictEqual(pausing, false); + pausing = true; + process.nextTick(() => { + pausing = false; + }); + }); + let flowing; let chunks = 10; let oldEnded = false;