From 66d5378368dfe971ad0924ad8adb62e19876a1cb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 22 Jul 2021 09:39:00 +0200 Subject: [PATCH] stream: introduce Body This introduce a new stream primitive called Body which helps with performance, ergonomics and compatibility when working with different types of data producers and consumers. Using Body it will be possible to delay converting streamlike objects as long as possible and enable some optimizations where we can avoid e.g. intermediate node streams. --- lib/internal/streams/body.js | 421 +++++++++++++++++++++++++++++++ lib/internal/streams/compose.js | 239 +----------------- lib/internal/streams/pipeline.js | 8 + lib/internal/streams/utils.js | 13 + lib/stream.js | 1 + 5 files changed, 455 insertions(+), 227 deletions(-) create mode 100644 lib/internal/streams/body.js diff --git a/lib/internal/streams/body.js b/lib/internal/streams/body.js new file mode 100644 index 00000000000000..954400b6d62d62 --- /dev/null +++ b/lib/internal/streams/body.js @@ -0,0 +1,421 @@ +'use strict'; + +const { Buffer, Blob } = require('buffer'); +const Duplex = require('internal/streams/duplex'); +const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); +const eos = require('internal/streams/end-of-stream'); +const { createDeferredPromise } = require('internal/util'); +const { destroyer } = require('internal/streams/destroy'); +const from = require('internal/streams/from'); +const assert = require('internal/assert'); + +const { + isBlob +} = require('internal/blob'); + +const { + isReadableStream, +} = require('internal/webstreams/readablestream'); + +const { + isWritableStream, +} = require('internal/webstreams/writablestream'); + +const { + isTransformStream, +} = require('internal/webstreams/transformstream'); + +const { + isIterable, + isDuplexNodeStream, + isReadable, + isReadableNodeStream, + isWritableNodeStream, + isWritable, +} = require('internal/streams/utils'); + +const { + Error, + JSONParse, + PromiseResolve, + Symbol, + SymbolAsyncIterator +} = primordials; + +const { + AbortError, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, + }, +} = require('internal/errors'); + +const kState = Symbol('kState'); + +// This is needed for pre node 17. +class BodyDuplex extends Duplex { + constructor(options) { + super(options); + + // https://github.com/nodejs/node/pull/34385 + + if (options?.readable === false) { + this._readableState.readable = false; + this._readableState.ended = true; + this._readableState.endEmitted = true; + } + + if (options?.writable === false) { + this._writableState.writable = false; + this._writableState.ending = true; + this._writableState.ended = true; + this._writableState.finished = true; + } + } +} + +class Body { + constructor(body) { + if ( + typeof body?.writable === 'object' || + typeof body?.readable === 'object' + ) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { + readable: body?.readable ? + new Body(body.readable).readableNodeStream() : undefined, + writable: body?.writable ? + new Body(body.writable).writableNodeStream() : undefined, + }; + } else if (isDuplexNodeStream(body)) { + this[kState] = { readable: body, writable: body }; + } else if (isReadableNodeStream(body)) { + this[kState] = { readable: body, writable: undefined }; + } else if (isWritableNodeStream(body)) { + this[kState] = { readable: undefined, writable: body }; + } else if (isReadableStream(body)) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { readable: Readable.fromWeb(body), writable: undefined }; + } else if (isWritableStream(body)) { + // TODO (ronag): Optimize. Delay conversion. + this[kState] = { readable: undefined, writable: Writable.fromWeb(body) }; + } else if (isTransformStream(body)) { + // TODO (ronag): Optimize. Delay conversion. + const d = BodyDuplex.fromWeb(body); + this[kState] = { readable: d, writable: d }; + } else if (typeof body === 'function') { + // TODO (ronag): Optimize. Delay conversion. + assert(body.length > 0); + + const { value, write, final } = fromAsyncGen(body); + + if (isIterable(value)) { + const d = from(BodyDuplex, value, { + objectMode: true, + highWaterMark: 1, + write, + final + }); + + this[kState] = { readable: d, writable: d }; + } else if (typeof value?.then === 'function') { + let d; + + const promise = PromiseResolve(value) + .then((val) => { + if (val != null) { + throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); + } + }) + .catch((err) => { + destroyer(d, err); + }); + + d = new BodyDuplex({ + objectMode: true, + highWaterMark: 1, + readable: false, + write, + final(cb) { + final(() => promise.then(cb, cb)); + } + }); + + this[kState] = { readable: d, writable: d }; + } else { + throw new ERR_INVALID_RETURN_VALUE( + 'Iterable, AsyncIterable or AsyncFunction', 'body', value); + } + } else if (isBlob(body)) { + // TODO (ronag): Optimize. Delay conversion. + const d = new Body(async function* () { + yield await body.arrayBuffer(); + }()).nodeStream(); + + this[kState] = { readable: d, writable: d }; + } else if (isIterable(body)) { + // TODO (ronag): Optimize. Delay conversion. + const d = from(BodyDuplex, body, { + objectMode: true, + highWaterMark: 1, + writable: false + }); + + this[kState] = { readable: d, writable: d }; + } else { + throw new ERR_INVALID_ARG_TYPE( + 'stream', + ['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable', + 'AsyncIterable', 'Function', '{ readable, writable } pair'], + body) + ; + } + } + + get writable() { + return this[kState].writable !== null; + } + + get readable() { + return this[kState].readable !== null; + } + + readableNodeStream() { + const { readable } = this[kState]; + + if (readable === null) { + // TODO: What error? + throw new Error('readable consumed'); + } + + this[kState].readable = null; + + return readable ?? new BodyDuplex({ readable: false, writable: false }); + } + + writableNodeStream() { + const { writable } = this[kState]; + + if (writable === null) { + // TODO: What error? + throw new Error('writable consumed'); + } + + this[kState].writable = null; + + return writable ?? new BodyDuplex({ readable: false, writable: false }); + } + + nodeStream() { + if (this.readable === null) { + // TODO: What error? + throw new Error('readable consumed'); + } + + if (this.writable === null) { + // TODO: What error? + throw new Error('writable consumed'); + } + + if (this[kState].readable === this[kState].writable) { + const d = this[kState].readable; + this[kState].readable = null; + this[kState].writable = null; + return d; + } + + const r = this.readableNodeStream(); + const w = this.writableNodeStream(); + + let readable = !!isReadable(r); + let writable = !!isWritable(w); + + let ondrain; + let onfinish; + let onreadable; + let onclose; + let d; + + function onfinished(err) { + const cb = onclose; + onclose = null; + + if (cb) { + cb(err); + } else if (err) { + d.destroy(err); + } else if (!readable && !writable) { + d.destroy(); + } + } + + eos(r, (err) => { + readable = false; + onfinished(err); + }); + + eos(w, (err) => { + writable = false; + onfinished(err); + }); + + d = new BodyDuplex({ + highWaterMark: 1, + readableObjectMode: !!r?.writableObjectMode, + writableObjectMode: !!w?.writableObjectMode, + readable, + writable, + }); + + if (writable) { + d._write = function(chunk, encoding, callback) { + if (w.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + d._final = function(callback) { + w.end(); + onfinish = callback; + }; + + w.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + + r.on('finish', function() { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + } + + if (readable) { + r.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); + } + }); + + r.on('end', function() { + d.push(null); + }); + + d._read = function() { + while (true) { + const buf = r.read(); + + if (buf === null) { + onreadable = d._read; + return; + } + + if (!d.push(buf)) { + return; + } + } + }; + } + + d._destroy = function(err, callback) { + if (!err && onclose !== null) { + err = new AbortError(); + } + + onreadable = null; + ondrain = null; + onfinish = null; + + if (onclose === null) { + callback(err); + } else { + onclose = callback; + destroyer(w, err); + destroyer(r, err); + } + }; + + return d; + } + + readableWebStream() { + return this.readableWebStream().asWeb(); + } + + writableWebStream() { + return this.writableNodeStream().asWeb(); + } + + [SymbolAsyncIterator]() { + return this.readableNodeStream()[SymbolAsyncIterator](); + } + + async blob() { + const sources = []; + for await (const chunk of this.readableNodeStream()) { + sources.push(chunk); + } + return new Blob(sources); + } + + async buffer() { + const sources = []; + for await (const chunk of this.readableNodeStream()) { + sources.push(chunk); + } + return Buffer.concat(sources); + } + + async arrayBuffer() { + const blob = await this.blob(); + return blob.arrayBuffer(); + } + + async text() { + let ret = ''; + for await (const chunk of this.readableNodeStream()) { + ret += chunk; + } + return ret; + } + + async json() { + return JSONParse(await this.text()); + } +} + +function fromAsyncGen(fn) { + let { promise, resolve } = createDeferredPromise(); + const value = fn(async function*() { + while (true) { + const { chunk, done, cb } = await promise; + process.nextTick(cb); + if (done) return; + yield chunk; + ({ promise, resolve } = createDeferredPromise()); + } + }()); + + return { + value, + write(chunk, encoding, cb) { + resolve({ chunk, done: false, cb }); + }, + final(cb) { + resolve({ done: true, cb }); + } + }; +} + +module.exports = Body; diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index bdfdc4cebe0c47..bfcb4692b18c1e 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -1,51 +1,18 @@ 'use strict'; const pipeline = require('internal/streams/pipeline'); -const Duplex = require('internal/streams/duplex'); -const { createDeferredPromise } = require('internal/util'); -const { destroyer } = require('internal/streams/destroy'); -const from = require('internal/streams/from'); +const Body = require('internal/streams/body'); const { isNodeStream, - isIterable, isReadable, isWritable, } = require('internal/streams/utils'); const { - PromiseResolve, -} = primordials; -const { - AbortError, codes: { - ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, - ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, }, } = require('internal/errors'); -const assert = require('internal/assert'); - -// This is needed for pre node 17. -class ComposeDuplex extends Duplex { - constructor(options) { - super(options); - - // https://github.com/nodejs/node/pull/34385 - - if (options?.readable === false) { - this._readableState.readable = false; - this._readableState.ended = true; - this._readableState.endEmitted = true; - } - - if (options?.writable === false) { - this._writableState.writable = false; - this._writableState.ending = true; - this._writableState.ended = true; - this._writableState.finished = true; - } - } -} module.exports = function compose(...streams) { if (streams.length === 0) { @@ -53,25 +20,28 @@ module.exports = function compose(...streams) { } if (streams.length === 1) { - return makeDuplex(streams[0], 'streams[0]'); + return Body(streams[0]); } const orgStreams = [...streams]; if (typeof streams[0] === 'function') { - streams[0] = makeDuplex(streams[0], 'streams[0]'); + streams[0] = new Body(streams[0]); } if (typeof streams[streams.length - 1] === 'function') { const idx = streams.length - 1; - streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`); + streams[idx] = new Body(streams[idx]); } for (let n = 0; n < streams.length; ++n) { + // TODO(ronag): Relax requirements. + if (!isNodeStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue; } + if (n < streams.length - 1 && !isReadable(streams[n])) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, @@ -79,6 +49,7 @@ module.exports = function compose(...streams) { 'must be readable' ); } + if (n > 0 && !isWritable(streams[n])) { throw new ERR_INVALID_ARG_VALUE( `streams[${n}]`, @@ -88,197 +59,11 @@ module.exports = function compose(...streams) { } } - let ondrain; - let onfinish; - let onreadable; - let onclose; - let d; - - function onfinished(err) { - const cb = onclose; - onclose = null; - - if (cb) { - cb(err); - } else if (err) { - d.destroy(err); - } else if (!readable && !writable) { - d.destroy(); - } - } - const head = streams[0]; - const tail = pipeline(streams, onfinished); - - const writable = !!isWritable(head); - const readable = !!isReadable(tail); + const tail = pipeline(streams, () => {}); - // TODO(ronag): Avoid double buffering. - // Implement Writable/Readable/Duplex traits. - // See, https://github.com/nodejs/node/pull/33515. - d = new ComposeDuplex({ - highWaterMark: 1, - writableObjectMode: !!head?.writableObjectMode, - readableObjectMode: !!tail?.writableObjectMode, - writable, - readable, + return new Body({ + writable: head, + readable: tail }); - - if (writable) { - d._write = function(chunk, encoding, callback) { - if (head.write(chunk, encoding)) { - callback(); - } else { - ondrain = callback; - } - }; - - d._final = function(callback) { - head.end(); - onfinish = callback; - }; - - head.on('drain', function() { - if (ondrain) { - const cb = ondrain; - ondrain = null; - cb(); - } - }); - - tail.on('finish', function() { - if (onfinish) { - const cb = onfinish; - onfinish = null; - cb(); - } - }); - } - - if (readable) { - tail.on('readable', function() { - if (onreadable) { - const cb = onreadable; - onreadable = null; - cb(); - } - }); - - tail.on('end', function() { - d.push(null); - }); - - d._read = function() { - while (true) { - const buf = tail.read(); - - if (buf === null) { - onreadable = d._read; - return; - } - - if (!d.push(buf)) { - return; - } - } - }; - } - - d._destroy = function(err, callback) { - if (!err && onclose !== null) { - err = new AbortError(); - } - - onreadable = null; - ondrain = null; - onfinish = null; - - if (onclose === null) { - callback(err); - } else { - onclose = callback; - destroyer(tail, err); - } - }; - - return d; }; - -function makeDuplex(stream, name) { - let ret; - if (typeof stream === 'function') { - assert(stream.length > 0); - - const { value, write, final } = fromAsyncGen(stream); - - if (isIterable(value)) { - ret = from(ComposeDuplex, value, { - objectMode: true, - highWaterMark: 1, - write, - final - }); - } else if (typeof value?.then === 'function') { - const promise = PromiseResolve(value) - .then((val) => { - if (val != null) { - throw new ERR_INVALID_RETURN_VALUE('nully', name, val); - } - }) - .catch((err) => { - destroyer(ret, err); - }); - - ret = new ComposeDuplex({ - objectMode: true, - highWaterMark: 1, - readable: false, - write, - final(cb) { - final(() => promise.then(cb, cb)); - } - }); - } else { - throw new ERR_INVALID_RETURN_VALUE( - 'Iterable, AsyncIterable or AsyncFunction', name, value); - } - } else if (isNodeStream(stream)) { - ret = stream; - } else if (isIterable(stream)) { - ret = from(ComposeDuplex, stream, { - objectMode: true, - highWaterMark: 1, - writable: false - }); - } else { - throw new ERR_INVALID_ARG_TYPE( - name, - ['Stream', 'Iterable', 'AsyncIterable', 'Function'], - stream) - ; - } - return ret; -} - -function fromAsyncGen(fn) { - let { promise, resolve } = createDeferredPromise(); - const value = fn(async function*() { - while (true) { - const { chunk, done, cb } = await promise; - process.nextTick(cb); - if (done) return; - yield chunk; - ({ promise, resolve } = createDeferredPromise()); - } - }()); - - return { - value, - write(chunk, encoding, cb) { - resolve({ chunk, done: false, cb }); - }, - final(cb) { - resolve({ done: true, cb }); - } - }; -} diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 1b56c08b9e6958..2aae1a3d875ef6 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -29,6 +29,7 @@ const { isIterable, isReadableNodeStream, isNodeStream, + isBody } = require('internal/streams/utils'); let PassThrough; @@ -198,6 +199,13 @@ function pipeline(...streams) { } } + for (let i = 0; i < streams.length; ++i) { + if (isBody(streams[i])) { + // TODO: Optimize by avoiding always converting into node stream. + streams[i] = streams[i].nodeStream(); + } + } + let ret; for (let i = 0; i < streams.length; i++) { const stream = streams[i]; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 01396b5113340f..d1dab565df3214 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -8,6 +8,13 @@ const { const kDestroyed = Symbol('kDestroyed'); +function isBody(obj) { + return !!( + obj && + typeof obj.nodeStream === 'function' + ); +} + function isReadableNodeStream(obj) { return !!( obj && @@ -27,6 +34,10 @@ function isWritableNodeStream(obj) { ); } +function isDuplexNodeStream(obj) { + return isReadableNodeStream(obj) && isWritableNodeStream(obj); +} + function isNodeStream(obj) { return ( obj && @@ -197,8 +208,10 @@ function willEmitClose(stream) { module.exports = { kDestroyed, + isBody, isClosed, isDestroyed, + isDuplexNodeStream, isFinished, isIterable, isReadable, diff --git a/lib/stream.js b/lib/stream.js index b84efb0fd8862d..00c9ac40d0fd45 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -39,6 +39,7 @@ const promises = require('stream/promises'); const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.Readable = require('internal/streams/readable'); +Stream.Body = require('internal/streams/body'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform');