From f3afed178f3dc3a87c93471a96ab4d462cc45b86 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 19 Nov 2021 19:14:32 +0100 Subject: [PATCH] stream: pipeline with end option Currently pipeline cannot fully replace pipe due to the missing end option. This PR adds the end option to the promisified pipeline method. --- lib/internal/streams/pipeline.js | 18 ++++++++++++------ lib/stream/promises.js | 4 +++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ab7e3cbb6c8a26..59ae617c1d019d 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -109,7 +109,7 @@ async function* fromReadable(val) { yield* Readable.prototype[SymbolAsyncIterator].call(val); } -async function pump(iterable, writable, finish) { +async function pump(iterable, writable, finish, opts) { let error; let onresolve = null; @@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) { } } - writable.end(); + if (opts !== false) { + writable.end(); + } await wait(); @@ -227,6 +229,7 @@ function pipelineImpl(streams, callback, opts) { const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; + const end = reading || opts?.end !== false; if (isNodeStream(stream)) { finishCount++; @@ -282,14 +285,17 @@ function pipelineImpl(streams, callback, opts) { then.call(ret, (val) => { value = val; - pt.end(val); + pt.write(val); + if (end) { + pt.end(); + } }, (err) => { pt.destroy(err); }, ); } else if (isIterable(ret, true)) { finishCount++; - pump(ret, pt, finish); + pump(ret, pt, finish, { end }); } else { throw new ERR_INVALID_RETURN_VALUE( 'AsyncIterable or Promise', 'destination', ret); @@ -302,7 +308,7 @@ function pipelineImpl(streams, callback, opts) { } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { - ret.pipe(stream); + ret.pipe(stream, { end }); // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. @@ -314,7 +320,7 @@ function pipelineImpl(streams, callback, opts) { ret = makeAsyncIterable(ret); finishCount++; - pump(ret, stream, finish); + pump(ret, stream, finish, { end }); } ret = stream; } else { diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 0db01a8b208d60..2fdcad3cc4aa3a 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream'); function pipeline(...streams) { return new Promise((resolve, reject) => { let signal; + let end; const lastArg = streams[streams.length - 1]; if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) { const options = ArrayPrototypePop(streams); signal = options.signal; + end = options.end; } pl(streams, (err, value) => { @@ -29,7 +31,7 @@ function pipeline(...streams) { } else { resolve(value); } - }, { signal }); + }, { signal, end }); }); }