Skip to content

Commit

Permalink
Implement readable stream teeing
Browse files Browse the repository at this point in the history
Closes #271; supercedes #302.

Includes an abstract operation, TeeReadableStream(stream, shouldClone) which is meant for use by other specs, plus a method ReadableStream.prototype.tee() (which does no cloning).
  • Loading branch information
domenic committed Mar 31, 2015
1 parent 84efaaf commit b326a6e
Show file tree
Hide file tree
Showing 5 changed files with 488 additions and 3 deletions.
160 changes: 159 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Logo: https://resources.whatwg.org/logo-streams.svg
!Version History: <a href="https://github.com/whatwg/streams/commits">https://github.com/whatwg/streams/commits</a>
!Version History: [SNAPSHOT-LINK]
!Version History: <a href="https://twitter.com/streamsstandard">@streamsstandard</a>
Link Defaults: html5 (dfn) structured clone
</pre>

<style>
Expand Down Expand Up @@ -97,6 +99,10 @@ Consumers also have the ability to <dfn lt="cancel a readable stream">cancel</df
that the consumer has lost interest in the stream, and will immediately close the stream, throw away any queued
<a>chunks</a>, and execute any cancellation mechanism of the <a>underlying source</a>.

Consumers can also <dfn lt="tee a readable stream">tee</dfn> a readable stream. This will
<a lt="locked to a reader">lock</a> the stream, making it no longer directly usable; however, it will create two new
streams, called <dfn lt="branches of a readable stream tee">branches</dfn>, which can be consumed independently.

<h3 id="ws-model">Writable Streams</h3>

A <dfn>writable stream</dfn> represents a destination for data, into which you can write. In other words, data goes
Expand Down Expand Up @@ -145,6 +151,10 @@ through it. If any step in the chain cannot yet accept chunks, it propagates a s
until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from
the original source according to how fast the chain can process chunks is called <dfn>backpressure</dfn>.

When <a lt="tee a readable stream">teeing</a> a readable stream, the <a>backpressure</a> signals from its two
<a href="branches of a readable stream tee">branches</a> will aggregate, such that if neither branch is read from, a
backpressure signal will be sent to the <a>underlying source</a> of the original stream.

<!-- TODO when we have writable stream writers
Piping a readable stream <a href="locked to a reader">locks</a> the readable stream, preventing it from being accessed
-->
Expand Down Expand Up @@ -247,6 +257,7 @@ would look like
getReader()
pipeThrough({ writable, readable }, options)
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
tee()
}
</code></pre>

Expand Down Expand Up @@ -351,7 +362,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
1. Set *this*@[[state]] to "readable".
1. Set *this*@[[started]], *this*@[[closeRequested]], and *this*@[[pullScheduled]] to *false*.
1. Set *this*@[[reader]], *this*@[[pullingPromise]], and *this*@[[storedError]] to *undefined*.
1. Set *this*@[[controller]] to Construct(<code>ReadableStreamController</code>, «*this*»).
1. Set *this*@[[controller]] to Construct(`ReadableStreamController`, «*this*»).
1. Let _startResult_ be InvokeOrNoop(_underlyingSource_, "start", «*this*@[[controller]]»).
1. ReturnIfAbrupt(startResult).
1. Resolve _startResult_ as a promise:
Expand Down Expand Up @@ -479,6 +490,46 @@ For now, please consider the reference implementation normative:
<a href="https://github.com/whatwg/streams/blob/master/reference-implementation/lib/readable-stream.js">reference-implementation/lib/readable-stream.js</a>,
look for the <code>pipeTo</code> method.

<h5 id="rs-tee">tee()</h5>

<div class="note">
The <code>tee</code> method <a lt="tee a readable stream">tees</a> this readable stream, returning a two-element
array containing the two resulting branches as new <code>ReadableStream</code> instances.

Teeing a stream will <a lt="locked to a reader">lock</a> it, preventing any other consumer from acquiring a reader.
To <a lt="cancel a readable stream">cancel</a> the stream, cancel both of the resulting branches; a composite
cancellation reason will then be propagated to the stream's <a>underlying source</a>.

Note that the <a>chunks</a> seen in each branch will be the same object. If the chunks are not immutable, this could
allow interference between the two branches. (<a href="https://github.com/whatwg/streams/issues/new">Let us know</a>
if you think we should add an option to <code>tee</code> that creates <a>structured clones</a> of the chunks for each
branch.)
</div>

<pre is="emu-alg">
1. If IsReadableStream(*this*) is *false*, throw a *TypeError* exception.
1. Return CreateArrayFromList(TeeReadableStream(*this*, *false*)).
</pre>

<div class="example">
Teeing a stream is most useful when you wish to let two independent consumers read from the stream in parallel,
perhaps even at different speeds. For example, given a writable stream <code>cacheEntry</code> representing an
on-disk file, and another writable stream <code>httpRequestBody</code> representing an upload to a remote server,
you could pipe the same readable stream to both destinations at once:

<pre><code class="lang-javascript">
const [forLocal, forRemote] = readableStream.tee();

Promise.all([
forLocal.pipeTo(cacheEntry)
forRemote.pipeTo(httpRequestBody)
])
.then(() => console.log("Saved the stream to the cache and also uploaded it!"))
.catch(e => console.error("Either caching or uploading failed: ", e));
</code></pre>
</div>


<h3 id="rs-controller-class" lt="ReadableStreamController">Class <code>ReadableStreamController</code></h3>

The <code>ReadableStreamController</code> class has methods that allow control of a <code>ReadableStream</code>'s state
Expand Down Expand Up @@ -940,6 +991,113 @@ readable stream is <a>locked to a reader</a>.
1. Return _shouldApplyBackpressure_.
</pre>

<h4 id="tee-readable-stream" aoid="TeeReadableStream">TeeReadableStream ( stream, shouldClone )</h4>

This abstract operation is meant to be called from other specifications that may wish to
<a lt="tee a readable stream">tee</a> a given readable stream. Its second argument governs whether or not the data from
the original stream will be <a lt="structured clone">structured cloned</a> before becoming visible in the returned
branches.

<pre is="emu-alg">
1. Assert: IsReadableStream(_stream_) is *true*.
1. Assert: Type(_shouldClone_) is Boolean.
1. Let _reader_ be AcquireReadableStreamReader(_reader_).
1. ReturnIfAbrupt(_reader_).
1. Let _teeState_ be Record{[[closedOrErrored]]: *false*, [[canceled1]]: *false*, [[canceled2]]: *false*, [[reason1]]: *undefined*, [[reason2]]: *undefined*, [[promise]]: a new promise}.
1. Let _pull_ be a new <a>TeeReadableStream pull function</a>.
1. Set _pull_@[[reader]] to _reader_, _pull_@[[teeState]] to _teeState_, and _pull_@[[shouldClone]] to _shouldClone_.
1. Let _cancel1_ be a new <a>TeeReadableStream branch 1 cancel function</a>.
1. Set _cancel1_@[[stream]] to _stream_ and _cancel1_@[[teeState]] to _teeState_.
1. Let _cancel2_ be a new <a>TeeReadableStream branch 2 cancel function</a>.
1. Set _cancel2_@[[stream]] to _stream_ and _cancel2_@[[teeState]] to _teeState_.
1. Let _underlyingSource1_ be ObjectCreate(%ObjectPrototype%).
1. Perform CreateDataProperty(_underlyingSource1_, "pull", _pull_).
1. Perform CreateDataProperty(_underlyingSource1_, "cancel", _cancel1_).
1. Let _branch1_ be Construct(`ReadableStream`, _underlyingSource1_).
1. Let _underlyingSource2_ be ObjectCreate(%ObjectPrototype%).
1. Perform CreateDataProperty(_underlyingSource2_, "pull", _pull_).
1. Perform CreateDataProperty(_underlyingSource2_, "cancel", _cancel2_).
1. Let _branch2_ be Construct(`ReadableStream`, _underlyingSource2_).
1. Set _pull_@[[branch1]] to _branch1_.
1. Set _pull_@[[branch2]] to _branch2_.
1. Upon rejection of _reader_@[[closedPromise]] with reason _r_,
1. If _teeState_.[[closedOrErrored]] is *true*, return *undefined*.
1. Call-with-rethrow ErrorReadableStream(_branch1_, _r_).
1. Call-with-rethrow ErrorReadableStream(_branch2_, _r_).
1. Set _teeState_.[[closedOrErrored]] to *true*.
1. Return «branch1, branch2».
</pre>

A <dfn>TeeReadableStream pull function</dfn> is an anonymous built-in function that pulls data from a given <a>readable
stream reader</a> and enqueues it into two other streams ("branches" of the associated tee). Each TeeReadableStream
pull function has \[[reader]], \[[branch1]], \[[branch2]], \[[teeState]], and \[[shouldClone]] internal slots. When a
TeeReadableStream pull function <var>F</var> is called, it performs the following steps:

<pre is="emu-alg">
1. Let _reader_ be _F_@[[reader]], _branch1_ be _F_@[[branch1]], _branch2_ be _F_@[[branch2]], _teeState_ be _F_@[[teeState]], and _shouldClone_ be _F_@[[shouldClone]].
1. Return the result of transforming ReadFromReadableStreamReader(_reader_) by a fulfillment handler which takes the argument _result_ and performs the following steps:
1. Assert: Type(_result_) is Object.
1. Let _value_ be Get(_result_, "value").
1. ReturnIfAbrupt(_value_).

This comment has been minimized.

Copy link
@tyoshino

tyoshino Apr 6, 2015

Member

Can we just Assert here?

This comment has been minimized.

Copy link
@domenic

domenic Apr 6, 2015

Author Member

I think if someone messed with Object.prototype.value, this might fail.

The ES spec would probably handle this with some kind of Record (i.e., a spec type that doesn't correspond to a real object). But, I am not sure how to integrate that with promises, which can only hold real objects. Maybe this is related to your GenerateChunk idea, and we need to refactor more.

1. Let _done_ be Get(_result_, "done").
1. ReturnIfAbrupt(_done_).

This comment has been minimized.

Copy link
@tyoshino

tyoshino Apr 6, 2015

Member

ditto

1. Assert: Type(_done_) is Boolean.
1. If _done_ is *true* and _teeState_.[[closedOrErrored]] is *false*,
1. Call-with-rethrow CloseReadableStream(_branch1_).
1. Call-with-rethrow CloseReadableStream(_branch2_).
1. Set _teeState_.[[closedOrErrored]] to *true*.
1. If _teeState_.[[closedOrErrored]] is *true*, return *undefined*.
1. If _teeState_.[[canceled1]] is *false*,
1. Let _value1_ be _value_.
1. If _shouldClone_ is *true*, set _value1_ to a <a>structured clone</a> of _value_.
1. Call-with-rethrow EnqueueInReadableStream(_branch1_, _value1_).
1. If _teeState_.[[canceled2]] is *false*,
1. Let _value2_ be _value_.
1. If _shouldClone_ is *true*, set _value2_ to a <a>structured clone</a> of _value_.

This comment has been minimized.

Copy link
@tyoshino

tyoshino Apr 6, 2015

Member

I don't remember if we have discussed this.

Do we create two clones and throw away the original?

This comment has been minimized.

Copy link
@domenic

domenic Apr 6, 2015

Author Member

Yeah, @yutakahirano and I discussed this briefly in person in Tokyo. We think symmetry is important. For example for the non-byte-stream-case, the structured clone of const r = new RegExp(); r.foo = "bar" does not have a foo property.

For the byte stream case, especially a UA-generated byte stream where we know the Uint8Arrays don't have expandos or anything, we could optimize by only creating one clone and re-using the original. But that's an optimization.

I will add a note about this in the spec.

1. Call-with-rethrow EnqueueInReadableStream(_branch2_, _value2_).
</pre>

A <dfn>TeeReadableStream branch 1 cancel function</dfn> is an anonymous built-in function that reacts to the
cancellation of the first of the two branches of the associated tee. Each TeeReadableStream branch 1 cancel function
has \[[stream]] and \[[teeState]] internal slots. When a TeeReadableStream branch 1 cancel function <var>F</var> is
called with argument <var>r</var>, it performs the following steps:

<pre is="emu-alg">
1. Let _stream_ be _F_@[[stream]] and _teeState_ be _F_@[[teeState]].
1. Set _teeState_.[[canceled1]] to *true*.
1. Set _teeState_.[[reason1]] to _r_.
1. If _teeState_.[[canceled2]] is *true*,
1. Let _compositeReason_ be CreateArrayFromList(«_teeState_.[[reason1]], _teeState_.[[reason2]]»).
1. Let _cancelResult_ be CancelReadableStream(_stream_, _compositeReason_).
1. ReturnIfAbrupt(_cancelResult_).
1. Resolve _teeState_.[[promise]] with _cancelResult_.
1. Return _teeState_.[[promise]].
</pre>

A <dfn>TeeReadableStream branch 2 cancel function</dfn> is an anonymous built-in function that reacts to the
cancellation of the second of the two branches of the associated tee. Each TeeReadableStream branch 2 cancel function
has \[[stream]] and \[[teeState]] internal slots. When a TeeReadableStream branch 2 cancel function <var>F</var> is
called with argument <var>r</var>, it performs the following steps:

<pre is="emu-alg">
1. Let _stream_ be _F_@[[stream]] and _teeState_ be _F_@[[teeState]].
1. Set _teeState_.[[canceled2]] to *true*.
1. Set _teeState_.[[reason2]] to _r_.
1. If _teeState_.[[canceled1]] is *true*,
1. Let _compositeReason_ be CreateArrayFromList(«_teeState_.[[reason1]], _teeState_.[[reason2]]»).
1. Let _cancelResult_ be CancelReadableStream(_stream_, _compositeReason_).
1. ReturnIfAbrupt(_cancelResult_).
1. Resolve _teeState_.[[promise]] with _cancelResult_.
1. Return _teeState_.[[promise]].
</pre>

<div class="note">
The algorithm given here is written such that three new function objects are created for each call to to
TeeReadableStream. This is just a simplification, and is not actually necessary, since it is unobservable to
developer code. For example, a self-hosted implementation could optimize by creating a class whose prototype contains
methods for these functions, with the state stored as instance variables.
</div>

<h2 id="ws">Writable Streams</h2>

<h3 id="ws-intro">Using Writable Streams</h3>
Expand Down
11 changes: 11 additions & 0 deletions reference-implementation/lib/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ export function toInteger(v) {
return Math.floor(Math.abs(v));
}

export function createDataProperty(o, p, v) {
assert(typeIsObject(o));
Object.defineProperty(o, p, { value: v, writable: true, enumerable: true, configurable: true });
}

export function createArrayFromList(elements) {
// We use arrays to represent lists, so this is basically a no-op.
// Do a slice though just in case we happen to depend on the unique-ness.
return elements.slice();
}

export function CreateIterResultObject(value, done) {
assert(typeof done === 'boolean');
const obj = {};
Expand Down
141 changes: 140 additions & 1 deletion reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const assert = require('assert');
import { CreateIterResultObject, InvokeOrNoop, PromiseInvokeOrNoop, typeIsObject } from './helpers';
import { CreateIterResultObject, InvokeOrNoop, PromiseInvokeOrNoop } from './helpers';
import { createArrayFromList, createDataProperty, typeIsObject } from './helpers';
import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes';

export default class ReadableStream {
Expand Down Expand Up @@ -139,6 +140,14 @@ export default class ReadableStream {
rejectPipeToPromise(reason);
}
}

tee() {
if (IsReadableStream(this) === false) {
throw new TypeError('ReadableStream.prototype.tee can only be used on a ReadableStream');
}

return createArrayFromList(TeeReadableStream(this, false));
}
}

class ReadableStreamController {
Expand Down Expand Up @@ -552,3 +561,133 @@ function ShouldReadableStreamApplyBackpressure(stream) {

return shouldApplyBackpressure;
}

function TeeReadableStream(stream, shouldClone) {
assert(IsReadableStream(stream) === true);
assert(typeof shouldClone === 'boolean');

const reader = AcquireReadableStreamReader(stream);

const teeState = {
closedOrErrored: false,
canceled1: false,
canceled2: false,
reason1: undefined,
reason2: undefined
};
teeState.promise = new Promise(resolve => teeState._resolve = resolve);

const pull = create_TeeReadableStreamPullFunction();
pull._reader = reader;
pull._teeState = teeState;
pull._shouldClone = shouldClone;

const cancel1 = create_TeeReadableStreamBranch1CancelFunction();
cancel1._stream = stream;
cancel1._teeState = teeState;

const cancel2 = create_TeeReadableStreamBranch2CancelFunction();
cancel2._stream = stream;
cancel2._teeState = teeState;

const underlyingSource1 = Object.create(Object.prototype);
createDataProperty(underlyingSource1, 'pull', pull);
createDataProperty(underlyingSource1, 'cancel', cancel1);
const branch1 = new ReadableStream(underlyingSource1);

const underlyingSource2 = Object.create(Object.prototype);
createDataProperty(underlyingSource2, 'pull', pull);
createDataProperty(underlyingSource2, 'cancel', cancel2);
const branch2 = new ReadableStream(underlyingSource2);

pull._branch1 = branch1;
pull._branch2 = branch2;

reader._closedPromise.catch(r => {
if (teeState.closedOrErrored === true) {
return undefined;
}

ErrorReadableStream(branch1, r);
ErrorReadableStream(branch2, r);
teeState.closedOrErrored = true;
});

return [branch1, branch2];
}

function create_TeeReadableStreamPullFunction() {
const f = () => {
const { _reader: reader, _branch1: branch1, _branch2: branch2, _teeState: teeState, _shouldClone: shouldClone } = f;

return ReadFromReadableStreamReader(reader).then(result => {
assert(typeIsObject(result));
const value = result.value;
const done = result.done;
assert(typeof done === "boolean");

if (done === true && teeState.closedOrErrored === false) {
CloseReadableStream(branch1);
CloseReadableStream(branch2);
teeState.closedOrErrored = true;
}

if (teeState.closedOrErrored === true) {
return undefined;
}

// There is no way to access the cloning code right now in the reference implementation.
// If we add one then we'll need an implementation for StructuredClone.


if (teeState.canceled1 === false) {
let value1 = value;
// if (shouldClone === true) {
// value1 = StructuredClone(value);
// }
EnqueueInReadableStream(branch1, value1);
}

if (teeState.canceled2 === false) {
let value2 = value;
// if (shouldClone === true) {
// value2 = StructuredClone(value);
// }
EnqueueInReadableStream(branch2, value2);
}
});
};
return f;
}

function create_TeeReadableStreamBranch1CancelFunction() {
const f = reason => {
const { _stream: stream, _teeState: teeState } = f;

teeState.canceled1 = true;
teeState.reason1 = reason;
if (teeState.canceled2 === true) {
const compositeReason = createArrayFromList([teeState.reason1, teeState.reason2]);
const cancelResult = CancelReadableStream(stream, compositeReason);
teeState._resolve(cancelResult);
}
return teeState.promise;
};
return f;
}

function create_TeeReadableStreamBranch2CancelFunction() {
const f = reason => {
const { _stream: stream, _teeState: teeState } = f;

teeState.canceled2 = true;
teeState.reason2 = reason;
if (teeState.canceled1 === true) {
const compositeReason = createArrayFromList([teeState.reason1, teeState.reason2]);
const cancelResult = CancelReadableStream(stream, compositeReason);
teeState._resolve(cancelResult);
}
return teeState.promise;
};
return f;
}
Loading

0 comments on commit b326a6e

Please sign in to comment.