From 3dc65646c88e095703d7140fae2671fb7ccf53e1 Mon Sep 17 00:00:00 2001 From: Rafael Gonzaga Date: Tue, 21 Dec 2021 07:24:15 -0300 Subject: [PATCH] stream: fix enqueue race condition on esm modules stream: use nextTick on close PR-URL: https://github.com/nodejs/node/pull/40901 Reviewed-By: Robert Nagy Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- lib/internal/webstreams/readablestream.js | 19 +++++++---- test/parallel/test-whatwg-readablestream.mjs | 36 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-whatwg-readablestream.mjs diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index cdabebefe999f0..ed12ec5e5fd703 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1446,13 +1446,18 @@ function readableStreamTee(stream, cloneForBranch2) { }); }, [kClose]() { - reading = false; - if (!canceled1) - readableStreamDefaultControllerClose(branch1[kState].controller); - if (!canceled2) - readableStreamDefaultControllerClose(branch2[kState].controller); - if (!canceled1 || !canceled2) - cancelPromise.resolve(); + // The `process.nextTick()` is not part of the spec. + // This approach was needed to avoid a race condition working with esm + // Further information, see: https://github.com/nodejs/node/issues/39758 + process.nextTick(() => { + reading = false; + if (!canceled1) + readableStreamDefaultControllerClose(branch1[kState].controller); + if (!canceled2) + readableStreamDefaultControllerClose(branch2[kState].controller); + if (!canceled1 || !canceled2) + cancelPromise.resolve(); + }); }, [kError]() { reading = false; diff --git a/test/parallel/test-whatwg-readablestream.mjs b/test/parallel/test-whatwg-readablestream.mjs new file mode 100644 index 00000000000000..a3693f62439ce7 --- /dev/null +++ b/test/parallel/test-whatwg-readablestream.mjs @@ -0,0 +1,36 @@ +import { mustCall } from '../common/index.mjs'; +import { ReadableStream } from 'stream/web'; +import assert from 'assert'; + +{ + // Test tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])); + + process.nextTick(() => { + controller.close(); + }); + }); + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(mustCall()); +}