diff --git a/doc/api/stream.md b/doc/api/stream.md index 14be27cbf2d911..dd1eca3d5003e5 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -46,6 +46,8 @@ There are four fundamental stream types within Node.js: * [Transform][] - Duplex streams that can modify or transform the data as it is written and read (for example [`zlib.createDeflate()`][]). +Additionally this module includes the utility function [pump][]. + ### Object Mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` @@ -89,7 +91,7 @@ total size of the internal write buffer is below the threshold set by the size of the internal buffer reaches or exceeds the `highWaterMark`, `false` will be returned. -A key goal of the `stream` API, particularly the [`stream.pipe()`] method, +A key goal of the `stream` API, particularly the [`stream.pump()`] function, is to limit the buffering of data to acceptable levels such that sources and destinations of differing speeds will not overwhelm the available memory. @@ -1244,6 +1246,18 @@ implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. The default implementation of `_destroy` for `Transform` also emit `'close'`. +#### Class Method: stream.pump(...streams[, callback]) + + +* `...streams` {Stream} Two or more streams to pipe between. +* `callback` {Function} A callback function that takes an optional error + argument. + +A class method to pipe between streams forwarding errors and properly cleaning +up. + ## API for Stream Implementers @@ -2334,14 +2348,15 @@ contain multi-byte characters. [TCP sockets]: net.html#net_class_net_socket [Transform]: #stream_class_stream_transform [Writable]: #stream_class_stream_writable +[async-iterator]: https://github.com/tc39/proposal-async-iteration [child process stdin]: child_process.html#child_process_subprocess_stdin [child process stdout and stderr]: child_process.html#child_process_subprocess_stdout [crypto]: crypto.html [fs read streams]: fs.html#fs_class_fs_readstream [fs write streams]: fs.html#fs_class_fs_writestream [http-incoming-message]: http.html#http_class_http_incomingmessage -[zlib]: zlib.html [hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding +[pump]: #stream_class_method_pump [stream-_flush]: #stream_transform_flush_callback [stream-_read]: #stream_readable_read_size_1 [stream-_transform]: #stream_transform_transform_chunk_encoding_callback @@ -2358,4 +2373,4 @@ contain multi-byte characters. [readable-destroy]: #stream_readable_destroy_error [writable-_destroy]: #stream_writable_destroy_err_callback [writable-destroy]: #stream_writable_destroy_error -[async-iterator]: https://github.com/tc39/proposal-async-iteration +[zlib]: zlib.html diff --git a/lib/_stream_pump.js b/lib/_stream_pump.js new file mode 100644 index 00000000000000..f8bfebb0c2f62b --- /dev/null +++ b/lib/_stream_pump.js @@ -0,0 +1,157 @@ +'use strict'; + +function noop() {} + +function isRequest(stream) { + return stream.setHeader && typeof stream.abort === 'function'; +} + +function isChildProcess(stream) { + return stream.stdio && + Array.isArray(stream.stdio) && stream.stdio.length === 3; +} + +function eos(stream, opts, _callback) { + if (typeof opts === 'function') return eos(stream, null, opts); + if (!opts) opts = {}; + + let callbackCalled = false; + const callback = (err) => { + if (!_callback || callbackCalled) { + return; + } + callbackCalled = true; + _callback.call(stream, err); + }; + + const ws = stream._writableState; + const rs = stream._readableState; + let readable = opts.readable || opts.readable !== false && stream.readable; + let writable = opts.writable || opts.writable !== false && stream.writable; + + const onlegacyfinish = () => { + if (!stream.writable) onfinish(); + }; + + const onfinish = () => { + writable = false; + if (!readable) callback(); + }; + + const onend = () => { + readable = false; + if (!writable) callback(); + }; + + const onexit = (exitCode) => { + if (exitCode) { + callback(new Error(`Exited with error code: ${exitCode}`)); + } else { + callback(null); + } + }; + + const onclose = () => { + if (readable && !(rs && rs.ended) || writable && !(ws && ws.ended)) + return callback(new Error('Premature close')); + }; + + const onrequest = () => + stream.req.on('finish', onfinish); + + if (isRequest(stream)) { + stream.on('complete', onfinish); + stream.on('abort', onclose); + if (stream.req) onrequest(); + else stream.on('request', onrequest); + } else if (writable && !ws) { // legacy streams + stream.on('end', onlegacyfinish); + stream.on('close', onlegacyfinish); + } + + if (isChildProcess(stream)) stream.on('exit', onexit); + + stream.on('end', onend); + stream.on('finish', onfinish); + if (opts.error !== false) stream.on('error', callback); + stream.on('close', onclose); + + return () => { + stream.removeListener('complete', onfinish); + stream.removeListener('abort', onclose); + stream.removeListener('request', onrequest); + if (stream.req) stream.req.removeListener('finish', onfinish); + stream.removeListener('end', onlegacyfinish); + stream.removeListener('close', onlegacyfinish); + stream.removeListener('finish', onfinish); + stream.removeListener('exit', onexit); + stream.removeListener('end', onend); + stream.removeListener('error', callback); + stream.removeListener('close', onclose); + }; +} + +function destroyer(stream, readable, writable, _callback) { + let callbackCalled = false; + const callback = (err) => { + if (callbackCalled) return; + callbackCalled = true; + return _callback(err); + }; + let closed = false; + stream.on('close', () => { + closed = true; + }); + + eos(stream, { readable, writable }, (err) => { + if (err) return callback(err); + closed = true; + callback(); + }); + + let destroyed = false; + return (err) => { + if (closed || destroyed) return; + destroyed = true; + + if (isRequest(stream)) + return stream.abort(); + + if (typeof stream.destroy === 'function') return stream.destroy(err); + + callback(err || new Error('Stream was destroyed')); + }; +} + +const call = (fn) => fn(); +const callErr = (err) => (fn) => fn(err); +const pipe = (from, to) => from.pipe(to); + +function pump(...streams) { + const callback = streams.pop() || noop; + + if (Array.isArray(streams[0])) streams = streams[0]; + + if (streams.length < 2) + throw new Error('Pump requires two streams per minimum.'); + + let firstError; + const destroys = streams.map((stream, i) => { + var reading = i < streams.length - 1; + var writing = i > 0; + return destroyer(stream, reading, writing, (err) => { + if (!firstError) firstError = err; + + if (err) destroys.forEach(callErr(err)); + + if (reading) return; + + destroys.forEach(call); + callback(firstError); + }); + }); + + return streams.reduce(pipe); +} + +module.exports = pump; diff --git a/lib/stream.js b/lib/stream.js index 9a816600a05e5a..6d241daa8fd165 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -32,6 +32,7 @@ Stream.Writable = require('_stream_writable'); Stream.Duplex = require('_stream_duplex'); Stream.Transform = require('_stream_transform'); Stream.PassThrough = require('_stream_passthrough'); +Stream.pump = require('_stream_pump'); // Backwards-compat with node 0.4.x Stream.Stream = Stream; diff --git a/node.gyp b/node.gyp index 08eee428560770..ce9390d9a533b2 100644 --- a/node.gyp +++ b/node.gyp @@ -66,6 +66,7 @@ 'lib/_stream_duplex.js', 'lib/_stream_transform.js', 'lib/_stream_passthrough.js', + 'lib/_stream_pump.js', 'lib/_stream_wrap.js', 'lib/string_decoder.js', 'lib/sys.js', diff --git a/test/parallel/test-stream-pump.js b/test/parallel/test-stream-pump.js new file mode 100644 index 00000000000000..d69f41924997d7 --- /dev/null +++ b/test/parallel/test-stream-pump.js @@ -0,0 +1,165 @@ +'use strict'; + +const stream = require('stream'); +const common = require('../common'); + +if (!common.hasCrypto) + common.skip('missing crypto'); + +const assert = require('assert'); +const crypto = require('crypto'); +const pump = stream.pump; +// tiny node-tap lookalike. +const tests = []; +let count = 0; + +function test(name, fn) { + count++; + tests.push([name, fn]); +} + +function run() { + const next = tests.shift(); + if (!next) + return console.error('ok'); + + const name = next[0]; + const fn = next[1]; + console.log('# %s', name); + fn({ + same: assert.deepStrictEqual, + equal: assert.strictEqual, + end: function() { + count--; + run(); + } + }); +} + +// ensure all tests have run +process.on('exit', function() { + assert.strictEqual(count, 0); +}); + +process.nextTick(run); + +test('basic pump', (t) => { + const rs = new stream.Readable({ + read(size) { + this.push(crypto.randomBytes(size)); + } + }); + const ws = new stream.Writable({ + write(chunk, enc, next) { + setImmediate(next); + } + }); + + function toHex() { + const reverse = new stream.Transform(); + + reverse._transform = function(chunk, enc, callback) { + reverse.push(chunk.toString('hex')); + callback(); + }; + + return reverse; + } + + let wsClosed = false; + let rsClosed = false; + let callbackCalled = false; + const timeout = setTimeout(function() { + throw new Error('timeout'); + }, 5000); + function check() { + if (wsClosed && rsClosed && callbackCalled) { + clearTimeout(timeout); + t.end(); + } + } + + ws.on('finish', function() { + wsClosed = true; + check(); + }); + + rs.on('end', function() { + rsClosed = true; + check(); + }); + + pump(rs, toHex(), toHex(), toHex(), ws, function() { + callbackCalled = true; + check(); + }); + + setTimeout(function() { + rs.push(null); + }, 1000); +}); +test('call destroy if error', (t) => { + let wsDestroyCalled = false; + let rsDestroyCalled = false; + let callbackCalled = false; + class RS extends stream.Readable { + _read(size) { + this.push(crypto.randomBytes(size)); + } + _destroy(err, cb) { + rsDestroyCalled = true; + check(); + super._destroy(err, cb); + } + } + class WS extends stream.Writable { + _write(chunk, enc, next) { + setImmediate(next); + } + _destroy(error, cb) { + t.equal(error && error.message, 'lets end this'); + this.emit('close'); + wsDestroyCalled = true; + check(); + super._destroy(error, cb); + } + } + const rs = new RS(); + const ws = new WS(); + function throwError() { + const reverse = new stream.Transform(); + let i = 0; + reverse._transform = function(chunk, enc, callback) { + i++; + if (i > 5) { + return callback(new Error('lets end this')); + } + reverse.push(chunk.toString('hex')); + callback(); + }; + + return reverse; + } + function toHex() { + const reverse = new stream.Transform(); + + reverse._transform = function(chunk, enc, callback) { + reverse.push(chunk.toString('hex')); + callback(); + }; + + return reverse; + } + + function check() { + if (wsDestroyCalled && rsDestroyCalled && callbackCalled) { + t.end(); + } + } + + pump(rs, toHex(), throwError(), toHex(), toHex(), ws, function(err) { + t.equal(err && err.message, 'lets end this'); + callbackCalled = true; + check(); + }); +});