From 0154e63dffe598cac8fe646bcaf3c0205c2b061d Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Sun, 16 Jun 2024 00:45:22 +0930 Subject: [PATCH] stream: pipeline wait for close before calling the callback The pipeline should wait for close event to finish before calling the callback. The `finishCount` should not below 0 when calling finish function. Fixes: https://github.com/nodejs/node/issues/51540 Co-authored-by: wh0 --- lib/internal/streams/pipeline.js | 12 +++++--- test/parallel/test-stream-pipeline.js | 42 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index bb34759b1fea12..83c53d80a32f5f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -225,6 +225,10 @@ function pipelineImpl(streams, callback, opts) { finishImpl(err, --finishCount === 0); } + function finishOnlyHandleError(err) { + finishImpl(err, false); + } + function finishImpl(err, final) { if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { error = err; @@ -279,7 +283,7 @@ function pipelineImpl(streams, callback, opts) { err.name !== 'AbortError' && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ) { - finish(err); + finishOnlyHandleError(err); } } stream.on('error', onError); @@ -372,7 +376,7 @@ function pipelineImpl(streams, callback, opts) { } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { finishCount += 2; - const cleanup = pipe(ret, stream, finish, { end }); + const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end }); if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup); } @@ -415,12 +419,12 @@ function pipelineImpl(streams, callback, opts) { return ret; } -function pipe(src, dst, finish, { end }) { +function pipe(src, dst, finish, finishOnlyHandleError, { end }) { let ended = false; dst.on('close', () => { if (!ended) { // Finish if the destination closes before the source has completed. - finish(new ERR_STREAM_PREMATURE_CLOSE()); + finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE()); } }); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 7e69754b36d771..fc721c3a562b9e 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1677,5 +1677,47 @@ tmpdir.refresh(); pipeline(r, w, common.mustCall((err) => { assert.strictEqual(err, undefined); })); +} + +{ + // See https://github.com/nodejs/node/issues/51540 for the following 2 tests + const src = new Readable(); + const dst = new Writable({ + destroy(error, cb) { + // Takes a while to destroy + setImmediate(cb); + }, + }); + + pipeline(src, dst, (err) => { + assert.strictEqual(src.closed, true); + assert.strictEqual(dst.closed, true); + assert.strictEqual(err.message, 'problem'); + }); + src.destroy(new Error('problem')); +} +{ + const src = new Readable(); + const dst = new Writable({ + destroy(error, cb) { + // Takes a while to destroy + setImmediate(cb); + }, + }); + const passThroughs = []; + for (let i = 0; i < 10; i++) { + passThroughs.push(new PassThrough()); + } + + pipeline(src, ...passThroughs, dst, (err) => { + assert.strictEqual(src.closed, true); + assert.strictEqual(dst.closed, true); + assert.strictEqual(err.message, 'problem'); + + for (let i = 0; i < passThroughs.length; i++) { + assert.strictEqual(passThroughs[i].closed, true); + } + }); + src.destroy(new Error('problem')); }