Skip to content

Commit

Permalink
stream: don't destroy on async iterator success
Browse files Browse the repository at this point in the history
Destroying on async iterator completion ignores autoDestroy.

PR-URL: #35122
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and nodejs-github-bot committed Sep 21, 2020
1 parent 5461794 commit 2b9003b
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 14 deletions.
40 changes: 29 additions & 11 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ Readable.prototype[SymbolAsyncIterator] = function() {
objectMode: true,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback();
callback(err);
}
}).wrap(src);
}
Expand All @@ -1088,24 +1088,39 @@ async function* createAsyncIterator(stream) {
}
}

const state = stream._readableState;

let error = state.errored;
let errorEmitted = state.errorEmitted;
let endEmitted = state.endEmitted;
let closeEmitted = state.closeEmitted;

stream
.on('readable', next)
.on('error', next)
.on('end', next)
.on('close', next);
.on('error', function(err) {
error = err;
errorEmitted = true;
next.call(this);
})
.on('end', function() {
endEmitted = true;
next.call(this);
})
.on('close', function() {
closeEmitted = true;
next.call(this);
});

try {
const state = stream._readableState;
while (true) {
const chunk = stream.read();
if (chunk !== null) {
yield chunk;
} else if (state.errored) {
throw state.errored;
} else if (state.ended) {
} else if (errorEmitted) {
throw error;
} else if (endEmitted) {
break;
} else if (state.closed) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
} else if (closeEmitted) {
break;
} else {
await new Promise(next);
Expand All @@ -1115,7 +1130,10 @@ async function* createAsyncIterator(stream) {
destroyImpl.destroyer(stream, err);
throw err;
} finally {
destroyImpl.destroyer(stream, null);
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
}

Expand Down
137 changes: 134 additions & 3 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
pipeline
} = require('stream');
const assert = require('assert');
const http = require('http');

async function tests() {
{
Expand Down Expand Up @@ -44,9 +45,11 @@ async function tests() {
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
await iter.next();
await iter.next();
await iter.next().catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
await iter.next()
.then(common.mustNotCall())
.catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
}

{
Expand Down Expand Up @@ -581,6 +584,61 @@ async function tests() {
assert.strictEqual(err, _err);
}));
}

{
// Don't destroy if no auto destroy.
// https://github.com/nodejs/node/issues/35116

const r = new Readable({
autoDestroy: false,
read() {
this.push('asd');
this.push(null);
}
});

for await (const chunk of r) {
chunk;
}
assert.strictEqual(r.destroyed, false);
}

{
// Destroy if no auto destroy and premature break.
// https://github.com/nodejs/node/pull/35122/files#r485678318

const r = new Readable({
autoDestroy: false,
read() {
this.push('asd');
}
});

for await (const chunk of r) {
chunk;
break;
}
assert.strictEqual(r.destroyed, true);
}

{
// Don't destroy before 'end'.

const r = new Readable({
read() {
this.push('asd');
this.push(null);
}
}).on('end', () => {
assert.strictEqual(r.destroyed, false);
});

for await (const chunk of r) {
chunk;
}

assert.strictEqual(r.destroyed, true);
}
}

{
Expand Down Expand Up @@ -643,5 +701,78 @@ async function tests() {
});
}

{
let _req;
const server = http.createServer((request, response) => {
response.statusCode = 404;
response.write('never ends');
});

server.listen(() => {
_req = http.request(`http://localhost:${server.address().port}`)
.on('response', common.mustCall(async (res) => {
setTimeout(() => {
_req.destroy(new Error('something happened'));
}, 100);

res.on('error', common.mustCall());

let _err;
try {
for await (const chunk of res) {
chunk;
}
} catch (err) {
_err = err;
}

assert.strictEqual(_err.code, 'ECONNRESET');
server.close();
}))
.on('error', common.mustCall())
.end();
});
}

{
async function getParsedBody(request) {
let body = '';

for await (const data of request) {
body += data;
}

try {
return JSON.parse(body);
} catch {
return {};
}
}

const str = JSON.stringify({ asd: true });
const server = http.createServer(async (request, response) => {
const body = await getParsedBody(request);
response.statusCode = 200;
assert.strictEqual(JSON.stringify(body), str);
response.end(JSON.stringify(body));
}).listen(() => {
http
.request({
method: 'POST',
hostname: 'localhost',
port: server.address().port,
})
.end(str)
.on('response', async (res) => {
let body = '';
for await (const chunk of res) {
body += chunk;
}
assert.strictEqual(body, str);
server.close();
});
});
}

// To avoid missing some tests if a promise does not resolve
tests().then(common.mustCall());

0 comments on commit 2b9003b

Please sign in to comment.