diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 373e42907bf0ca..abd7faf59f2723 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1066,7 +1066,7 @@ Readable.prototype[SymbolAsyncIterator] = function() { objectMode: true, destroy(err, callback) { destroyImpl.destroyer(src, err); - callback(); + callback(err); } }).wrap(src); } @@ -1088,24 +1088,39 @@ async function* createAsyncIterator(stream) { } } + const state = stream._readableState; + + let error = state.errored; + let errorEmitted = state.errorEmitted; + let endEmitted = state.endEmitted; + let closeEmitted = state.closeEmitted; + stream .on('readable', next) - .on('error', next) - .on('end', next) - .on('close', next); + .on('error', function(err) { + error = err; + errorEmitted = true; + next.call(this); + }) + .on('end', function() { + endEmitted = true; + next.call(this); + }) + .on('close', function() { + closeEmitted = true; + next.call(this); + }); try { - const state = stream._readableState; while (true) { const chunk = stream.read(); if (chunk !== null) { yield chunk; - } else if (state.errored) { - throw state.errored; - } else if (state.ended) { + } else if (errorEmitted) { + throw error; + } else if (endEmitted) { break; - } else if (state.closed) { - // TODO(ronag): ERR_PREMATURE_CLOSE? + } else if (closeEmitted) { break; } else { await new Promise(next); @@ -1115,7 +1130,10 @@ async function* createAsyncIterator(stream) { destroyImpl.destroyer(stream, err); throw err; } finally { - destroyImpl.destroyer(stream, null); + if (state.autoDestroy || !endEmitted) { + // TODO(ronag): ERR_PREMATURE_CLOSE? + destroyImpl.destroyer(stream, null); + } } } diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 604ba3afb47fe7..c2fde72457f22f 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -9,6 +9,7 @@ const { pipeline } = require('stream'); const assert = require('assert'); +const http = require('http'); async function tests() { { @@ -44,9 +45,11 @@ async function tests() { const iter = Readable.prototype[Symbol.asyncIterator].call(stream); await iter.next(); await iter.next(); - await iter.next().catch(common.mustCall((err) => { - assert.strictEqual(err.message, 'asd'); - })); + await iter.next() + .then(common.mustNotCall()) + .catch(common.mustCall((err) => { + assert.strictEqual(err.message, 'asd'); + })); } { @@ -581,6 +584,61 @@ async function tests() { assert.strictEqual(err, _err); })); } + + { + // Don't destroy if no auto destroy. + // https://github.com/nodejs/node/issues/35116 + + const r = new Readable({ + autoDestroy: false, + read() { + this.push('asd'); + this.push(null); + } + }); + + for await (const chunk of r) { + chunk; + } + assert.strictEqual(r.destroyed, false); + } + + { + // Destroy if no auto destroy and premature break. + // https://github.com/nodejs/node/pull/35122/files#r485678318 + + const r = new Readable({ + autoDestroy: false, + read() { + this.push('asd'); + } + }); + + for await (const chunk of r) { + chunk; + break; + } + assert.strictEqual(r.destroyed, true); + } + + { + // Don't destroy before 'end'. + + const r = new Readable({ + read() { + this.push('asd'); + this.push(null); + } + }).on('end', () => { + assert.strictEqual(r.destroyed, false); + }); + + for await (const chunk of r) { + chunk; + } + + assert.strictEqual(r.destroyed, true); + } } { @@ -643,5 +701,78 @@ async function tests() { }); } +{ + let _req; + const server = http.createServer((request, response) => { + response.statusCode = 404; + response.write('never ends'); + }); + + server.listen(() => { + _req = http.request(`http://localhost:${server.address().port}`) + .on('response', common.mustCall(async (res) => { + setTimeout(() => { + _req.destroy(new Error('something happened')); + }, 100); + + res.on('error', common.mustCall()); + + let _err; + try { + for await (const chunk of res) { + chunk; + } + } catch (err) { + _err = err; + } + + assert.strictEqual(_err.code, 'ECONNRESET'); + server.close(); + })) + .on('error', common.mustCall()) + .end(); + }); +} + +{ + async function getParsedBody(request) { + let body = ''; + + for await (const data of request) { + body += data; + } + + try { + return JSON.parse(body); + } catch { + return {}; + } + } + + const str = JSON.stringify({ asd: true }); + const server = http.createServer(async (request, response) => { + const body = await getParsedBody(request); + response.statusCode = 200; + assert.strictEqual(JSON.stringify(body), str); + response.end(JSON.stringify(body)); + }).listen(() => { + http + .request({ + method: 'POST', + hostname: 'localhost', + port: server.address().port, + }) + .end(str) + .on('response', async (res) => { + let body = ''; + for await (const chunk of res) { + body += chunk; + } + assert.strictEqual(body, str); + server.close(); + }); + }); +} + // To avoid missing some tests if a promise does not resolve tests().then(common.mustCall());