Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream pump #13506

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ There are four fundamental stream types within Node.js:
* [Transform][] - Duplex streams that can modify or transform the data as it
is written and read (for example [`zlib.createDeflate()`][]).

Additionally this module inclues a utility function [pump][].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/inclues/includes/


### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -89,7 +91,7 @@ total size of the internal write buffer is below the threshold set by
the size of the internal buffer reaches or exceeds the `highWaterMark`, `false`
will be returned.

A key goal of the `stream` API, particularly the [`stream.pipe()`] method,
A key goal of the `stream` API, particularly the [`stream.pump()`] function,
is to limit the buffering of data to acceptable levels such that sources and
destinations of differing speeds will not overwhelm the available memory.

Expand Down Expand Up @@ -1244,6 +1246,14 @@ implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].
The default implementation of `_destroy` for `Transform` also emit `'close'`.

#### Class Method: stream.pump(...streams[, callback])

* two or more streams to pipe between
* optional callback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters should be formatted according to the style used in the rest of the documentation.

Also, it would be a good idea to either add the function signature here in some way or at the very least describe in the description below what parameters the callback should have.


A class method to pipe between streams forwarding errors and properly cleaning
up.

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -2334,14 +2344,15 @@ contain multi-byte characters.
[TCP sockets]: net.html#net_class_net_socket
[Transform]: #stream_class_stream_transform
[Writable]: #stream_class_stream_writable
[async-iterator]: https://github.com/tc39/proposal-async-iteration
[child process stdin]: child_process.html#child_process_subprocess_stdin
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
[crypto]: crypto.html
[fs read streams]: fs.html#fs_class_fs_readstream
[fs write streams]: fs.html#fs_class_fs_writestream
[http-incoming-message]: http.html#http_class_http_incomingmessage
[zlib]: zlib.html
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[pump]: #stream_class_method_pump
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
Expand All @@ -2358,4 +2369,4 @@ contain multi-byte characters.
[readable-destroy]: #stream_readable_destroy_error
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[async-iterator]: https://github.com/tc39/proposal-async-iteration
[zlib]: zlib.html
191 changes: 191 additions & 0 deletions lib/_stream_pump.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// The MIT License (MIT)
//
// Copyright (c) 2014 Mathias Buus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that this file keeps getting vendored in? So I guess review comments on the code here make limited sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a semi-related note, I wish we could collect any/all vendored code under a single directory to better indicate that PRs/issues/etc. should not be opened in nodejs/node for those files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can drop the copyright, as stated by @mafintosh in mafintosh/pump#17 (comment). Are you still of that idea, @mafintosh?

//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

'use strict';

function noop() {}

function isRequest(stream) {
return stream.setHeader && isFn(stream.abort);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a test that exercises http request and fs.

I think you can inline the typeof fs === 'function' check.


function isChildProcess(stream) {
return stream.stdio &&
Array.isArray(stream.stdio) && stream.stdio.length === 3;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a test for child processes as well.


function isFn(fn) {
return typeof fn === 'function';
}

function eos(stream, opts, _callback) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might also expose this, as it is another must-have.

if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};
let callbackCalled = false;
const callback = (err) => {
if (!_callback || callbackCalled) {
return;
}
callbackCalled = true;
_callback.call(stream, err);
};
const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable ||
(opts.readable !== false && stream.readable);
let writable = opts.writable ||
(opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

const onfinish = () => {
writable = false;
if (!readable) callback();
};

const onend = () => {
readable = false;
if (!writable) callback();
};

const onexit = (exitCode) => {
if (exitCode) {
callback(new Error('exited with error code: ' + exitCode));
} else {
callback();
}
};

const onclose = () => {
if (readable && !(rs && rs.ended))
return callback(new Error('premature close'));

if (writable && !(ws && ws.ended))
return callback(new Error('premature close'));
};

const onrequest = () =>
stream.req.on('finish', onfinish);

if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !ws) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}

if (isChildProcess(stream)) stream.on('exit', onexit);

stream.on('end', onend);
stream.on('finish', onfinish);
if (opts.error !== false) stream.on('error', callback);
stream.on('close', onclose);

return () => {
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('exit', onexit);
stream.removeListener('end', onend);
stream.removeListener('error', callback);
stream.removeListener('close', onclose);
};
}

function destroyer(stream, reading, writing, _callback) {
let callbackCalled = false;
const callback = (err) => {
if (callbackCalled) return;

callbackCalled = true;

return _callback(err);
};
let closed = false;
stream.on('close', () => {
closed = true;
});

eos(stream, {readable: reading, writable: writing}, (err) => {
if (err) return callback(err);
closed = true;
callback();
});

var destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
destroyed = true;

if (isRequest(stream))
return stream.abort();
// request.destroy just do .end - .abort is what we want

if (isFn(stream.destroy)) return stream.destroy(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, this wont work on older streams, which is why it was not calling it with err before


callback(err || new Error('stream was destroyed'));
};
}

const call = (fn) => fn();
const callErr = (err) => (fn) => fn(err);
const pipe = (from, to) => from.pipe(to);

function pump(...streams) {
const callback = isFn(streams[streams.length - 1] || noop) &&
streams.pop() || noop;

if (Array.isArray(streams[0])) streams = streams[0];

if (streams.length < 2)
throw new Error('pump requires two streams per minimum');

let error;
const destroys = streams.map((stream, i) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to close over this  here, as it might be the global Stream  object.

var reading = i < streams.length - 1;
var writing = i > 0;
return destroyer(stream, reading, writing, (err) => {
if (!error) error = err;

if (err) destroys.forEach(callErr(err));

if (reading) return;

destroys.forEach(call);
callback(error);
});
});

return streams.reduce(pipe);
}

module.exports = pump;
1 change: 1 addition & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
Stream.pump = require('_stream_pump');

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
'lib/_stream_pump.js',
'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
Expand Down
Loading