From 09d8c0c8d2c17fc07e9e313c0f35a7b696d18bc8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 10 Jul 2021 15:03:02 +0200 Subject: [PATCH] stream: destroy readable on read error PR-URL: https://github.com/nodejs/node/pull/39342 Reviewed-By: James M Snell Reviewed-By: Matteo Collina --- doc/api/stream.md | 3 - lib/internal/streams/readable.js | 30 +++++++--- .../test-stream-readable-async-iterators.js | 60 ------------------- ...tream-readable-with-unimplemented-_read.js | 21 +++---- 4 files changed, 29 insertions(+), 85 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 603e0407a8c60c..63f444ec796655 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1525,9 +1525,6 @@ added: v16.3.0 * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the async iterator, or exiting a `for await...of` iteration using a `break`, `return`, or `throw` will not destroy the stream. **Default:** `true`. - * `destroyOnError` {boolean} When set to `false`, if the stream emits an - error while it's being iterated, the iterator will not destroy the stream. - **Default:** `true`. * Returns: {AsyncIterator} to consume the stream. The iterator created by this method gives users the option to cancel the diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index f25413fa9d6269..5e558fb0f58d54 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -486,7 +486,22 @@ Readable.prototype.read = function(n) { state.needReadable = true; // Call internal read method - this._read(state.highWaterMark); + try { + const result = this._read(state.highWaterMark); + if (result != null) { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + nop, + function(err) { + errorOrDestroy(this, err); + }); + } + } + } catch (err) { + errorOrDestroy(this, err); + } state.sync = false; // If _read pushed data synchronously, then `reading` will be false, @@ -1131,14 +1146,11 @@ async function* createAsyncIterator(stream, options) { error = aggregateTwoErrors(error, err); throw error; } finally { - if (error) { - if (options?.destroyOnError !== false) { - destroyImpl.destroyer(stream, error); - } - } else if (options?.destroyOnReturn !== false) { - if (error === undefined || stream._readableState.autoDestroy) { - destroyImpl.destroyer(stream, null); - } + if ( + (error || options?.destroyOnReturn !== false) && + (error === undefined || stream._readableState.autoDestroy) + ) { + destroyImpl.destroyer(stream, null); } } } diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index d546505062eeef..45b27a155d0efd 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -750,22 +750,6 @@ async function tests() { })()); } - function createErrorReadable() { - const opts = { read() { throw new Error('inner'); } }; - return new Readable(opts); - } - - // Check default destroys on return - (async function() { - const readable = createReadable(); - for await (const chunk of readable.iterator()) { - assert.strictEqual(chunk, 5); - break; - } - - assert.ok(readable.destroyed); - })().then(common.mustCall()); - // Check explicit destroying on return (async function() { const readable = createReadable(); @@ -777,50 +761,6 @@ async function tests() { assert.ok(readable.destroyed); })().then(common.mustCall()); - // Check default destroys on error - (async function() { - const readable = createErrorReadable(); - try { - // eslint-disable-next-line no-unused-vars - for await (const chunk of readable) { } - assert.fail('should have thrown'); - } catch (err) { - assert.strictEqual(err.message, 'inner'); - } - - assert.ok(readable.destroyed); - })().then(common.mustCall()); - - // Check explicit destroys on error - (async function() { - const readable = createErrorReadable(); - const opts = { destroyOnError: true, destroyOnReturn: false }; - try { - // eslint-disable-next-line no-unused-vars - for await (const chunk of readable.iterator(opts)) { } - assert.fail('should have thrown'); - } catch (err) { - assert.strictEqual(err.message, 'inner'); - } - - assert.ok(readable.destroyed); - })().then(common.mustCall()); - - // Check explicit non-destroy with return true - (async function() { - const readable = createErrorReadable(); - const opts = { destroyOnError: false, destroyOnReturn: true }; - try { - // eslint-disable-next-line no-unused-vars - for await (const chunk of readable.iterator(opts)) { } - assert.fail('should have thrown'); - } catch (err) { - assert.strictEqual(err.message, 'inner'); - } - - assert.ok(!readable.destroyed); - })().then(common.mustCall()); - // Check explicit non-destroy with return true (async function() { const readable = createReadable(); diff --git a/test/parallel/test-stream-readable-with-unimplemented-_read.js b/test/parallel/test-stream-readable-with-unimplemented-_read.js index 16ec2ac8cd8852..85e83aa3b61da0 100644 --- a/test/parallel/test-stream-readable-with-unimplemented-_read.js +++ b/test/parallel/test-stream-readable-with-unimplemented-_read.js @@ -1,18 +1,13 @@ 'use strict'; -require('../common'); - -const assert = require('assert'); +const common = require('../common'); const { Readable } = require('stream'); const readable = new Readable(); -assert.throws( - () => { - readable.read(); - }, - { - code: 'ERR_METHOD_NOT_IMPLEMENTED', - name: 'Error', - message: 'The _read() method is not implemented' - } -); +readable.read(); +readable.on('error', common.expectsError({ + code: 'ERR_METHOD_NOT_IMPLEMENTED', + name: 'Error', + message: 'The _read() method is not implemented' +})); +readable.on('close', common.mustCall());