Skip to content

Commit

Permalink
stream: make all streams error in a pipeline
Browse files Browse the repository at this point in the history
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: nodejs#30861
Fixes: nodejs#28194
  • Loading branch information
mcollina committed Dec 12, 2019
1 parent edf654d commit a384141
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
24 changes: 18 additions & 6 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,21 @@ function destroyer(stream, reading, writing, callback) {

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') return stream.destroy();
if (typeof stream.destroy === 'function') {
if (stream.req && stream._writableState === undefined) {
// This is a ClientRequest
// TODO(mcollina): backward compatible fix to avoid crashing.
// Possibly remove in a later semver-major change.
stream.req.on('error', noop);
}
return stream.destroy(err);
}

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function call(fn) {
fn();
}
function noop() {}

function pipe(from, to) {
return from.pipe(to);
Expand Down Expand Up @@ -81,9 +87,15 @@ function pipeline(...streams) {
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (err) {
for (const destroy of destroys) {
destroy(err);
}
}
if (reading) return;
destroys.forEach(call);
for (const destroy of destroys) {
destroy();
}
callback(error);
});
});
Expand Down
31 changes: 31 additions & 0 deletions test/parallel/test-stream-pipeline-async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict';

const common = require('../common');
const { Readable, PassThrough, pipeline } = require('stream');
const assert = require('assert');

const _err = new Error('kaboom');

async function run() {
const source = new Readable({
read() {
}
});
source.push('hello');
source.push('world');

setImmediate(() => { source.destroy(_err); });

const iterator = pipeline(
source,
new PassThrough(),
() => {});

iterator.setEncoding('utf8');

for await (const k of iterator) {
assert.strictEqual(k, 'helloworld');
}
}

run().catch(common.mustCall((err) => assert.strictEqual(err, _err)));
6 changes: 6 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ const { promisify } = require('util');
transform.on('close', common.mustCall());
write.on('close', common.mustCall());

[read, transform, write].forEach((stream) => {
stream.on('error', common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
});

const dst = pipeline(read, transform, write, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
Expand Down

0 comments on commit a384141

Please sign in to comment.