Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add stream.pipeline and stream.onEnd #19828

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.

An attempt was made to call [`stream.write()`][] with a `null` chunk.

<a id="ERR_STREAM_PREMATURE_CLOSE"></a>
### ERR_STREAM_PREMATURE_CLOSE

An error returned by `stream.finished()` and `stream.pipeline()`, when a stream
or a pipeline ends non gracefully with no explicit error.

<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
### ERR_STREAM_PUSH_AFTER_EOF

Expand Down
106 changes: 106 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ There are four fundamental stream types within Node.js:
* [Transform][] - Duplex streams that can modify or transform the data as it
is written and read (for example [`zlib.createDeflate()`][]).

Additionally this module includes the utility functions [pipeline][] and
[finished][].

### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -1283,6 +1286,107 @@ implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
The default implementation of `_destroy` for `Transform` also emit `'close'`.

### stream.finished(stream, callback)
<!-- YAML
added: REPLACEME
-->

* `stream` {Stream} A readable and/or writable stream.
* `callback` {Function} A callback function that takes an optional error
argument.

A function to get notified when a stream is no longer readable, writable
or has experienced an error or a premature close event.

```js
const { finished } = require('stream');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
if (err) {
console.error('Stream failed', err);
} else {
console.log('Stream is done reading');
}
});

rs.resume(); // drain the stream
```

Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

The `finished` API is promisify'able as well;

```js
const finished = util.promisify(stream.finished);

const rs = fs.createReadStream('archive.tar');

async function run() {
await finished(rs);
console.log('Stream is done reading');
}

run().catch(console.error);
rs.resume(); // drain the stream
```

### stream.pipeline(...streams[, callback])
<!-- YAML
added: REPLACEME
-->

* `...streams` {Stream} Two or more streams to pipe between.
* `callback` {Function} A callback function that takes an optional error
argument.

A module method to pipe between streams forwarding errors and properly cleaning
up and provide a callback when the pipeline is complete.

```js
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
```

The `pipeline` API is promisify'able as well:

```js
const pipeline = util.promisify(stream.pipeline);

async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded');
}

run().catch(console.error);
```

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -2395,6 +2499,8 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[zlib]: zlib.html
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[pipeline]: #stream_stream_pipeline_streams_callback
[finished]: #stream_stream_finished_stream_callback
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
Expand Down
96 changes: 96 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Ported from https://github.com/mafintosh/end-of-stream with
// permission from the author, Mathias Buus (@mafintosh).

'use strict';

const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function once(callback) {
let called = false;
return function(err) {
if (called) return;
called = true;
callback.call(this, err);
};
}

function eos(stream, opts, callback) {
if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};

callback = once(callback || noop);

const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

const onfinish = () => {
writable = false;
if (!readable) callback.call(stream);
};

const onend = () => {
readable = false;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
};

const onrequest = () => {
stream.req.on('finish', onfinish);
};

if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !ws) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}

stream.on('end', onend);
stream.on('finish', onfinish);
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

return function() {
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
};
}

module.exports = eos;
95 changes: 95 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Ported from https://github.com/mafintosh/pump with
// permission from the author, Mathias Buus (@mafintosh).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the original code has a copyright/license, then it should likely either be included here or embedded in the LICENSE file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, but I did mention here I'm fine waiving copyright, mafintosh/pump#17 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm fine with whatever is easiest btw)


'use strict';

const eos = require('internal/streams/end-of-stream');

const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

function once(callback) {
let called = false;
return function(err) {
if (called) return;
called = true;
callback(err);
};
}

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

let closed = false;
stream.on('close', () => {
closed = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
if (err) return callback(err);
closed = true;
callback();
});

let destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
destroyed = true;

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') return stream.destroy();

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function call(fn) {
fn();
}

function pipe(from, to) {
return from.pipe(to);
}

function popCallback(streams) {
if (!streams.length) return noop;
if (typeof streams[streams.length - 1] !== 'function') return noop;
return streams.pop();
}

function pipeline(...streams) {
const callback = popCallback(streams);

if (Array.isArray(streams[0])) streams = streams[0];

if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

let error;
const destroys = streams.map(function(stream, i) {
const reading = i < streams.length - 1;
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (reading) return;
destroys.forEach(call);
callback(error);
});
});

return streams.reduce(pipe);
}

module.exports = pipeline;
5 changes: 5 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
'use strict';

const { Buffer } = require('buffer');
const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
Expand All @@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

Stream.pipeline = pipeline;
Stream.finished = eos;

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

Expand Down
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
'lib/internal/streams/state.js',
'lib/internal/streams/pipeline.js',
'lib/internal/streams/end-of-stream.js',
'lib/internal/wrap_js_stream.js',
'deps/v8/tools/splaytree.js',
'deps/v8/tools/codemap.js',
Expand Down
Loading