From 111f0bd9b61b71362b07872a7b33065c95381b3d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 18 Oct 2021 12:08:47 +0200 Subject: [PATCH] stream: fix fromAsyncGen Fixes: https://github.com/nodejs/node/issues/40497 PR-URL: https://github.com/nodejs/node/pull/40499 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum --- lib/internal/streams/duplexify.js | 14 ++++++++---- test/parallel/test-stream-duplex-from.js | 28 +++++++++++++++++++++++- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index cbcbf1fd69cc3b..a15d97d36a5f6e 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -209,22 +209,28 @@ function fromAsyncGen(fn) { const signal = ac.signal; const value = fn(async function*() { while (true) { - const { chunk, done, cb } = await promise; + const _promise = promise; + promise = null; + const { chunk, done, cb } = await _promise; process.nextTick(cb); if (done) return; if (signal.aborted) throw new AbortError(); - yield chunk; ({ promise, resolve } = createDeferredPromise()); + yield chunk; } }(), { signal }); return { value, write(chunk, encoding, cb) { - resolve({ chunk, done: false, cb }); + const _resolve = resolve; + resolve = null; + _resolve({ chunk, done: false, cb }); }, final(cb) { - resolve({ done: true, cb }); + const _resolve = resolve; + resolve = null; + _resolve({ done: true, cb }); }, destroy(err, cb) { ac.abort(); diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index 265b61dfd062f9..446768d6eef3e3 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -2,7 +2,7 @@ const common = require('../common'); const assert = require('assert'); -const { Duplex, Readable, Writable } = require('stream'); +const { Duplex, Readable, Writable, pipeline } = require('stream'); { const d = Duplex.from({ @@ -118,3 +118,29 @@ const { Duplex, Readable, Writable } = require('stream'); assert.strictEqual(d.readable, false); })); } + +{ + // https://github.com/nodejs/node/issues/40497 + pipeline( + ['abc\ndef\nghi'], + Duplex.from(async function * (source) { + let rest = ''; + for await (const chunk of source) { + const lines = (rest + chunk.toString()).split('\n'); + rest = lines.pop(); + for (const line of lines) { + yield line; + } + } + yield rest; + }), + async function * (source) { + let ret = ''; + for await (const x of source) { + ret += x; + } + assert.strictEqual(ret, 'abcdefghi'); + }, + common.mustCall(() => {}), + ); +}