From 6c91184878909c5e1b5f79829060a6a80b3c3133 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 22 Jul 2021 09:39:00 +0200 Subject: [PATCH 1/6] stream: introduce Body This introduce a new stream primitive called Body which helps with performance, ergonomics and compatibility when working with different types of data producers and consumers. Using Body it will be possible to delay converting streamlike objects as long as possible and enable some optimizations where we can avoid e.g. intermediate node streams. --- lib/internal/streams/body.js | 325 ++++++++++++++++++++++++ lib/internal/streams/compose.js | 119 ++------- lib/internal/streams/duplexify.js | 165 ++++++++++++ lib/internal/streams/pipeline.js | 12 + lib/internal/streams/utils.js | 20 ++ lib/stream.js | 1 + test/parallel/test-bootstrap-modules.js | 2 + test/parallel/test-stream-compose.js | 27 +- 8 files changed, 562 insertions(+), 109 deletions(-) create mode 100644 lib/internal/streams/body.js create mode 100644 lib/internal/streams/duplexify.js diff --git a/lib/internal/streams/body.js b/lib/internal/streams/body.js new file mode 100644 index 00000000000000..0cd287909c307c --- /dev/null +++ b/lib/internal/streams/body.js @@ -0,0 +1,325 @@ +'use strict'; + +const { Buffer, Blob } = require('buffer'); +const Duplex = require('internal/streams/duplex'); +const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); +const duplexify = require('internal/streams/duplexify'); +const { createDeferredPromise } = require('internal/util'); +const { destroyer } = require('internal/streams/destroy'); +const from = require('internal/streams/from'); +const assert = require('internal/assert'); + +const { + isBlob +} = require('internal/blob'); + +const { + isBrandCheck, +} = require('internal/webstreams/util'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + +const { + isIterable, + isDuplexNodeStream, + isReadableNodeStream, + isWritableNodeStream, +} = require('internal/streams/utils'); + +const { + JSONParse, + PromiseResolve, + Symbol, + SymbolAsyncIterator +} = primordials; + +const { + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, + ERR_INVALID_STATE, + }, +} = require('internal/errors'); + +const kState = Symbol('kState'); + +// This is needed for pre node 17. +class BodyDuplex extends Duplex { + constructor(options) { + super(options); + + // https://github.com/nodejs/node/pull/34385 + + if (options?.readable === false) { + this._readableState.readable = false; + this._readableState.ended = true; + this._readableState.endEmitted = true; + } + + if (options?.writable === false) { + this._writableState.writable = false; + this._writableState.ending = true; + this._writableState.ended = true; + this._writableState.finished = true; + } + } +} + +class Body { + constructor(body, options) { + // TODO (ronag): What about TransformStream? + + if (body[kState]) { + this[kState] = body[kState]; + } else if ( + isReadableStream(body?.readable) && + isWritableStream(body?.writable) + ) { + // TODO (ronag): Optimize. Delay conversion. + const d = Duplex.fromWeb(body, options); + this[kState] = { readable: d, writable: d }; + } else if (isWritableStream(body?.writable)) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { + readable: undefined, + writable: Writable.fromWeb(body, options) + }; + } else if (isReadableStream(body?.readable)) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { + readable: Readable.fromWeb(body, options), + writable: undefined + }; + } else if (isDuplexNodeStream(body)) { + this[kState] = { readable: body, writable: body }; + } else if (isReadableNodeStream(body)) { + this[kState] = { readable: body, writable: undefined }; + } else if (isWritableNodeStream(body)) { + this[kState] = { readable: undefined, writable: body }; + } else if (isReadableStream(body)) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { + readable: Readable.fromWeb(body, options), + writable: undefined + }; + } else if (isWritableStream(body)) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { + readable: undefined, + writable: Writable.fromWeb(body, options) + }; + } else if (typeof body === 'function') { + // TODO (ronag): Optimize. Delay conversion. + assert(body.length > 0); + + const { value, write, final } = fromAsyncGen(body); + + if (isIterable(value)) { + const d = from(BodyDuplex, value, { + objectMode: true, + highWaterMark: 1, + ...options, + write, + final + }); + + this[kState] = { readable: d, writable: d }; + } else if (typeof value?.then === 'function') { + let d; + + const promise = PromiseResolve(value) + .then((val) => { + if (val != null) { + throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); + } + }) + .catch((err) => { + destroyer(d, err); + }); + + d = new BodyDuplex({ + objectMode: true, + highWaterMark: 1, + ...options, + readable: false, + write, + final(cb) { + final(() => promise.then(cb, cb)); + } + }); + + this[kState] = { readable: d, writable: d }; + } else { + throw new ERR_INVALID_RETURN_VALUE( + 'Iterable, AsyncIterable or AsyncFunction', 'body', value); + } + } else if (isBlob(body)) { + // TODO (ronag): Optimize. Delay conversion. + const d = new Body(async function* () { + yield await body.arrayBuffer(); + }()).nodeStream(); + + this[kState] = { readable: d, writable: d }; + } else if (isIterable(body)) { + // TODO (ronag): Optimize. Delay conversion. + const d = from(BodyDuplex, body, { + objectMode: true, + highWaterMark: 1, + ...options, + writable: false + }); + + this[kState] = { readable: d, writable: d }; + } else if ( + typeof body?.writable === 'object' || + typeof body?.readable === 'object' + ) { + // TODO (ronag): Optimize. Delay conversion. + const readable = body?.readable ? + isReadableNodeStream(body?.readable) ? body?.readable : + new Body(body.readable).readableNodeStream() : undefined; + const writable = body?.writable ? + isWritableNodeStream(body?.writable) ? body?.writable : + new Body(body.writable).writableNodeStream() : undefined; + + this[kState] = { readable, writable }; + } else { + throw new ERR_INVALID_ARG_TYPE( + 'stream', + ['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable', + 'AsyncIterable', 'Function', '{ readable, writable } pair'], + body) + ; + } + } + + get writable() { + return !!this[kState].writable; + } + + get readable() { + return !!this[kState].readable; + } + + readableNodeStream() { + const { readable } = this[kState]; + + if (readable === null) { + throw new ERR_INVALID_STATE('read lock'); + } + + this[kState].readable = null; + + // TODO (ronag): Hide Writable interface. + return readable ?? new BodyDuplex({ readable: false, writable: false }); + } + + writableNodeStream() { + const { writable } = this[kState]; + + if (writable === null) { + throw new ERR_INVALID_STATE('write lock'); + } + + this[kState].writable = null; + + // TODO (ronag): Hide Readable interface. + return writable ?? new BodyDuplex({ readable: false, writable: false }); + } + + nodeStream() { + if (this.readable === null) { + throw new ERR_INVALID_STATE('read lock'); + } + + if (this.writable === null) { + throw new ERR_INVALID_STATE('write lock'); + } + + if (this[kState].readable === this[kState].writable) { + const d = this[kState].readable; + this[kState].readable = null; + this[kState].writable = null; + return d; + } + + return duplexify({ + readable: this.readableNodeStream(), + writable: this.writableNodeStream(), + }); + } + + readableWebStream() { + return this.readableWebStream().asWeb(); + } + + writableWebStream() { + return this.writableNodeStream().asWeb(); + } + + [SymbolAsyncIterator]() { + return this.readableNodeStream()[SymbolAsyncIterator](); + } + + async blob() { + const sources = []; + for await (const chunk of this.readableNodeStream()) { + sources.push(chunk); + } + return new Blob(sources); + } + + async buffer() { + const sources = []; + for await (const chunk of this.readableNodeStream()) { + sources.push(chunk); + } + return Buffer.concat(sources); + } + + async arrayBuffer() { + const blob = await this.blob(); + return blob.arrayBuffer(); + } + + async text() { + let ret = ''; + for await (const chunk of this.readableNodeStream()) { + ret += chunk; + } + return ret; + } + + async json() { + return JSONParse(await this.text()); + } +} + +function fromAsyncGen(fn) { + let { promise, resolve } = createDeferredPromise(); + const value = fn(async function*() { + while (true) { + const { chunk, done, cb } = await promise; + process.nextTick(cb); + if (done) return; + yield chunk; + ({ promise, resolve } = createDeferredPromise()); + } + }()); + + return { + value, + write(chunk, encoding, cb) { + resolve({ chunk, done: false, cb }); + }, + final(cb) { + resolve({ done: true, cb }); + } + }; +} + +module.exports = Body; diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index bdfdc4cebe0c47..c450146ac00c32 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -2,28 +2,21 @@ const pipeline = require('internal/streams/pipeline'); const Duplex = require('internal/streams/duplex'); -const { createDeferredPromise } = require('internal/util'); const { destroyer } = require('internal/streams/destroy'); -const from = require('internal/streams/from'); -const { - isNodeStream, - isIterable, - isReadable, - isWritable, -} = require('internal/streams/utils'); -const { - PromiseResolve, -} = primordials; +const Body = require('internal/streams/body'); + const { AbortError, codes: { - ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, - ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, }, } = require('internal/errors'); -const assert = require('internal/assert'); +const { + isNodeStream, + isReadable, + isWritable, +} = require('internal/streams/utils'); // This is needed for pre node 17. class ComposeDuplex extends Duplex { @@ -53,21 +46,23 @@ module.exports = function compose(...streams) { } if (streams.length === 1) { - return makeDuplex(streams[0], 'streams[0]'); + return new Body(streams[0]); } const orgStreams = [...streams]; if (typeof streams[0] === 'function') { - streams[0] = makeDuplex(streams[0], 'streams[0]'); + streams[0] = new Body(streams[0]).nodeStream(); } if (typeof streams[streams.length - 1] === 'function') { const idx = streams.length - 1; - streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`); + streams[idx] = new Body(streams[idx]); } for (let n = 0; n < streams.length; ++n) { + // TODO(ronag): Relax requirements. + if (!isNodeStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue; @@ -113,15 +108,12 @@ module.exports = function compose(...streams) { const writable = !!isWritable(head); const readable = !!isReadable(tail); - // TODO(ronag): Avoid double buffering. - // Implement Writable/Readable/Duplex traits. - // See, https://github.com/nodejs/node/pull/33515. d = new ComposeDuplex({ - highWaterMark: 1, + // TODO (ronag): highWaterMark? + readableObjectMode: !!tail?.readableObjectMode, writableObjectMode: !!head?.writableObjectMode, - readableObjectMode: !!tail?.writableObjectMode, - writable, readable, + writable, }); if (writable) { @@ -201,84 +193,5 @@ module.exports = function compose(...streams) { } }; - return d; + return new Body(d); }; - -function makeDuplex(stream, name) { - let ret; - if (typeof stream === 'function') { - assert(stream.length > 0); - - const { value, write, final } = fromAsyncGen(stream); - - if (isIterable(value)) { - ret = from(ComposeDuplex, value, { - objectMode: true, - highWaterMark: 1, - write, - final - }); - } else if (typeof value?.then === 'function') { - const promise = PromiseResolve(value) - .then((val) => { - if (val != null) { - throw new ERR_INVALID_RETURN_VALUE('nully', name, val); - } - }) - .catch((err) => { - destroyer(ret, err); - }); - - ret = new ComposeDuplex({ - objectMode: true, - highWaterMark: 1, - readable: false, - write, - final(cb) { - final(() => promise.then(cb, cb)); - } - }); - } else { - throw new ERR_INVALID_RETURN_VALUE( - 'Iterable, AsyncIterable or AsyncFunction', name, value); - } - } else if (isNodeStream(stream)) { - ret = stream; - } else if (isIterable(stream)) { - ret = from(ComposeDuplex, stream, { - objectMode: true, - highWaterMark: 1, - writable: false - }); - } else { - throw new ERR_INVALID_ARG_TYPE( - name, - ['Stream', 'Iterable', 'AsyncIterable', 'Function'], - stream) - ; - } - return ret; -} - -function fromAsyncGen(fn) { - let { promise, resolve } = createDeferredPromise(); - const value = fn(async function*() { - while (true) { - const { chunk, done, cb } = await promise; - process.nextTick(cb); - if (done) return; - yield chunk; - ({ promise, resolve } = createDeferredPromise()); - } - }()); - - return { - value, - write(chunk, encoding, cb) { - resolve({ chunk, done: false, cb }); - }, - final(cb) { - resolve({ done: true, cb }); - } - }; -} diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js new file mode 100644 index 00000000000000..a3a503d170f4d6 --- /dev/null +++ b/lib/internal/streams/duplexify.js @@ -0,0 +1,165 @@ +'use strict'; + +const { + isReadable, + isWritable, +} = require('internal/streams/utils'); +const eos = require('internal/streams/end-of-stream'); +const { + AbortError, +} = require('internal/errors'); +const { destroyer } = require('internal/streams/destroy'); +const Duplex = require('internal/streams/duplex'); + +// This is needed for pre node 17. +class DuplexifyDuplex extends Duplex { + constructor(options) { + super(options); + + // https://github.com/nodejs/node/pull/34385 + + if (options?.readable === false) { + this._readableState.readable = false; + this._readableState.ended = true; + this._readableState.endEmitted = true; + } + + if (options?.writable === false) { + this._writableState.writable = false; + this._writableState.ending = true; + this._writableState.ended = true; + this._writableState.finished = true; + } + } +} + +module.exports = function duplexify(pair) { + const r = pair.readable; + const w = pair.writable; + + let readable = !!isReadable(r); + let writable = !!isWritable(w); + + let ondrain; + let onfinish; + let onreadable; + let onclose; + let d; + + function onfinished(err) { + const cb = onclose; + onclose = null; + + if (cb) { + cb(err); + } else if (err) { + d.destroy(err); + } else if (!readable && !writable) { + d.destroy(); + } + } + + eos(r, (err) => { + readable = false; + if (err) { + destroyer(w, err); + } + onfinished(err); + }); + + eos(w, (err) => { + writable = false; + if (err) { + destroyer(r, err); + } + onfinished(err); + }); + + d = new DuplexifyDuplex({ + // TODO (ronag): highWaterMark? + readableObjectMode: !!r?.readableObjectMode, + writableObjectMode: !!w?.writableObjectMode, + readable, + writable, + }); + + if (writable) { + d._write = function(chunk, encoding, callback) { + if (w.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + d._final = function(callback) { + w.end(); + onfinish = callback; + }; + + w.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + + w.on('finish', function() { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + } + + if (readable) { + r.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); + } + }); + + r.on('end', function() { + d.push(null); + }); + + d._read = function() { + while (true) { + const buf = r.read(); + + if (buf === null) { + onreadable = d._read; + return; + } + + if (!d.push(buf)) { + return; + } + } + }; + } + + d._destroy = function(err, callback) { + if (!err && onclose !== null) { + err = new AbortError(); + } + + onreadable = null; + ondrain = null; + onfinish = null; + + if (onclose === null) { + callback(err); + } else { + onclose = callback; + destroyer(w, err); + destroyer(r, err); + } + }; + + return d; +}; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1b56c08b9e6958..92a533b6d4b909 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -29,6 +29,7 @@ const { isIterable, isReadableNodeStream, isNodeStream, + isBody } = require('internal/streams/utils'); let PassThrough; @@ -198,6 +199,17 @@ function pipeline(...streams) { } } + for (let i = 0; i < streams.length; ++i) { + if (isBody(streams[i])) { + // TODO: Optimize by avoiding always converting into node stream. + streams[i] = i === 0 ? + streams[i].readableNodeStream() : + i === streams.length - 1 ? + streams[i].writableNodeStream() : + streams[i].nodeStream(); + } + } + let ret; for (let i = 0; i < streams.length; i++) { const stream = streams[i]; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 01396b5113340f..c8df7b86c99534 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -8,6 +8,15 @@ const { const kDestroyed = Symbol('kDestroyed'); +function isBody(obj) { + return !!( + obj && + typeof obj.nodeStream === 'function' && + typeof obj.writableNodeStream === 'function' && + typeof obj.readableNodeStream === 'function' + ); +} + function isReadableNodeStream(obj) { return !!( obj && @@ -27,6 +36,15 @@ function isWritableNodeStream(obj) { ); } +function isDuplexNodeStream(obj) { + return !!( + obj && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + typeof obj.write === 'function' + ); +} + function isNodeStream(obj) { return ( obj && @@ -197,8 +215,10 @@ function willEmitClose(stream) { module.exports = { kDestroyed, + isBody, isClosed, isDestroyed, + isDuplexNodeStream, isFinished, isIterable, isReadable, diff --git a/lib/stream.js b/lib/stream.js index b84efb0fd8862d..00c9ac40d0fd45 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -39,6 +39,7 @@ const promises = require('stream/promises'); const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.Readable = require('internal/streams/readable'); +Stream.Body = require('internal/streams/body'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index d02f0c71860554..345cc2092e4952 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -101,9 +101,11 @@ const expectedModules = new Set([ 'NativeModule internal/stream_base_commons', 'NativeModule internal/streams/add-abort-signal', 'NativeModule internal/streams/buffer_list', + 'NativeModule internal/streams/body', 'NativeModule internal/streams/compose', 'NativeModule internal/streams/destroy', 'NativeModule internal/streams/duplex', + 'NativeModule internal/streams/duplexify', 'NativeModule internal/streams/end-of-stream', 'NativeModule internal/streams/from', 'NativeModule internal/streams/legacy', diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index c3d52e08e0048e..f4effbe0ae8570 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -27,6 +27,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .end('asd') .on('data', common.mustCall((buf) => { res += buf; @@ -50,6 +51,7 @@ const assert = require('assert'); } } ) + .nodeStream() .end('asd') .on('data', common.mustCall((buf) => { res += buf; @@ -68,6 +70,7 @@ const assert = require('assert'); } } ) + .nodeStream() .end('asd') .on('data', common.mustCall((buf) => { res += buf; @@ -87,6 +90,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .on('data', common.mustCall((buf) => { res += buf; })) @@ -107,6 +111,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .on('data', common.mustCall((buf) => { res += buf; })) @@ -135,6 +140,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .end('asd') .on('finish', common.mustCall(() => { assert.strictEqual(res, 'ASD'); @@ -160,6 +166,7 @@ const assert = require('assert'); } } ) + .nodeStream() .end('asd') .on('finish', common.mustCall(() => { assert.strictEqual(res, 'ASD'); @@ -187,6 +194,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .end(true) .on('data', common.mustCall((buf) => { res = buf; @@ -217,6 +225,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .end(true) .on('data', common.mustNotCall()) .on('end', common.mustNotCall()) @@ -249,6 +258,7 @@ const assert = require('assert'); }) }) ) + .nodeStream() .end(true) .on('data', common.mustNotCall()) .on('end', common.mustNotCall()) @@ -272,7 +282,8 @@ const assert = require('assert'); for await (const chunk of source) { buf += chunk; } - }); + }) + .nodeStream(); assert.strictEqual(s1.writable, false); assert.strictEqual(s1.readable, false); @@ -290,7 +301,8 @@ const assert = require('assert'); for await (const chunk of source) { yield String(chunk).toUpperCase(); } - }); + }) + .nodeStream(); s2.end('helloworld'); s2.resume(); s2.on('data', (chunk) => { @@ -326,7 +338,7 @@ const assert = require('assert'); } }); - const s4 = compose(s1, s2, s3); + const s4 = compose(s1, s2, s3).nodeStream(); finished(s4, common.mustCall((err) => { assert(!err); @@ -349,7 +361,8 @@ const assert = require('assert'); for await (const chunk of source) { buf += chunk; } - }); + }) + .nodeStream(); finished(s1, common.mustCall((err) => { assert(!err); @@ -397,7 +410,8 @@ const assert = require('assert'); buf += chunk; } return buf; - }); + }) + .nodeStream(); finished(s1, common.mustCall((err) => { assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); @@ -416,7 +430,8 @@ const assert = require('assert'); for await (const chunk of source) { buf += chunk; } - }); + }) + .nodeStream(); finished(s1, common.mustCall((err) => { assert(!err); From e774e3d9dca187e2d23bdca98b5f22ee22346ece Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 22 Jul 2021 14:03:31 +0200 Subject: [PATCH 2/6] fixup --- doc/api/stream.md | 179 ++++++++++++++++++++++++-------- lib/internal/streams/body.js | 8 -- lib/internal/streams/compose.js | 11 +- 3 files changed, 141 insertions(+), 57 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 5cc312abb42ccb..0a1f689a632749 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1722,6 +1722,141 @@ const cleanup = finished(rs, (err) => { // ... }); ``` +#### Class: `stream.Body` + + + + +`new stream.Body` can be used to seamlessly convert between different types of +read/write stream interfaces, including async functions and iterables. + +* `AsyncIterable` converts into a readable. Cannot yield + `null`. +* `AsyncGeneratorFunction` converts into a readable/writable. + Must take a source `AsyncIterable` as first parameter. Cannot yield + `null`. +* `AsyncFunction` converts into a writable. Must return + either `null` or `undefined`. + +```mjs +import { Body, compose } from 'stream'; +import { finished } from 'stream/promises'; +// Convert AsyncIterable into readable Duplex. +const s1 = new Body(async function*() { + yield 'Hello'; + yield 'World'; +}()); +// Convert AsyncGenerator into transform Duplex. +const s2 = new Body(async function*(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +}); +let res = ''; +// Convert AsyncFunction into writable Duplex. +const s3 = new Body(async function(source) { + for await (const chunk of source) { + res += chunk; + } +}); +await finished(compose(s1, s2, s3).toNodeStream()); +console.log(res); // prints 'HELLOWORLD' +``` + +### `body.arrayBuffer()` + + +* Returns: {Promise} + +Returns a promise that fulfills with an {ArrayBuffer} containing a copy of +the body data. + +### `body.blob()` + + +* Returns: {Promise} + +Returns a promise that fulfills with an {Blob} containing a copy of the body data. + +### `body.buffer()` + + +* Returns: {Promise} + +Returns a promise that fulfills with an {Buffer} containing a copy of the body data. + +### `body.nodeStream()` + + +* Returns: {Duplex} + +Returns the a `stream.Duplex`. + +### `body.readableNodeStream()` + + +* Returns: {Readable} + +Returns the a `stream.Readable`. + +### `body.writableNodeStream()` + + +* Returns: {Readable} + +Returns the a `stream.Writable`. + +### `body.readableWebStream()` + + +* Returns: {ReadableStream} + +Returns the a `web.ReadableStream`. + +### `body.writableWebStream()` + + +* Returns: {WritableStream} + +Returns the a `web.WritableStream`. + +### `body.text()` + + +* Returns: {Promise} + +Returns a promise that resolves the contents of the body decoded as a UTF-8 +string. + +### `body.json()` + + +* Returns: {Promise} + +Returns a promise that resolves the contents of the body parsed as a UTF-8 +JSON. ### `stream.pipeline(source[, ...transforms], destination, callback)` ### `stream.pipeline(streams, callback)` @@ -1861,7 +1996,7 @@ failure, this can cause event listener leaks and swallowed errors. added: REPLACEME --> -* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]} +* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]|Body[]} * Returns: {stream.Duplex} Combines two or more streams into a `Duplex` stream that writes to the @@ -1901,48 +2036,6 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { console.log(res); // prints 'HELLOWORLD' ``` -`stream.compose` can be used to convert async iterables, generators and -functions into streams. - -* `AsyncIterable` converts into a readable `Duplex`. Cannot yield - `null`. -* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. - Must take a source `AsyncIterable` as first parameter. Cannot yield - `null`. -* `AsyncFunction` converts into a writable `Duplex`. Must return - either `null` or `undefined`. - -```mjs -import { compose } from 'stream'; -import { finished } from 'stream/promises'; - -// Convert AsyncIterable into readable Duplex. -const s1 = compose(async function*() { - yield 'Hello'; - yield 'World'; -}()); - -// Convert AsyncGenerator into transform Duplex. -const s2 = compose(async function*(source) { - for await (const chunk of source) { - yield String(chunk).toUpperCase(); - } -}); - -let res = ''; - -// Convert AsyncFunction into writable Duplex. -const s3 = compose(async function(source) { - for await (const chunk of source) { - res += chunk; - } -}); - -await finished(compose(s1, s2, s3)); - -console.log(res); // prints 'HELLOWORLD' -``` - ### `stream.Readable.from(iterable, [options])`