Skip to content

Commit

Permalink
naive attempt at transferring readable streams
Browse files Browse the repository at this point in the history
As [[storedError]] is observable, can be an arbitrary object,
and is very likely an uncloneable Error, it can't be sent to
a new realm reliably. So just forbid errored streams.

Still needs clearer semantics of when structured cloning occurs
and how DataCloneErrors are reported.

Cloning needs polyfilling somehow too.

Related to: whatwg#244, whatwg#276
  • Loading branch information
isonmad committed Dec 3, 2016
1 parent 872188a commit 20b4a8b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
28 changes: 27 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ Instances of {{ReadableStream}} are created with the internal slots described in
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[Detached]]
<td>A boolean flag set to <emu-val>true</emu-val> when the stream has been transferred away to another realm
</tr>
<tr>
<td>\[[disturbed]]
<td>A boolean flag set to <emu-val>true</emu-val> when the stream has been read from or canceled
Expand Down Expand Up @@ -448,6 +452,7 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
<emu-alg>
1. Set *this*.[[state]] to `"readable"`.
1. Set *this*.[[reader]] and *this*.[[storedError]] to *undefined*.
1. Set *this*.[[Detached]] to *false*.
1. Set *this*.[[disturbed]] to *false*.
1. Set *this*.[[readableStreamController]] to *undefined*.
1. Let _type_ be ? GetV(_underlyingSource_, `"type"`).
Expand Down Expand Up @@ -733,6 +738,25 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
</code></pre>
</div>

<h4 id="rs-internal-methods">Readable Stream Internal Methods</h4>

The following internal method is implemented by each {{ReadableStream}} instance.

<h5 id="rs-internal-method-transfer">\[[Transfer]](<var>targetRealm</var>)</h5>

<emu-alg>
1. If ! IsReadableStreamLocked(*this*) is *true*, throw a *TypeError* exception.
1. If *this.[[state]] is `"errored"`, throw a *TypeError* exception.
1. Let _that_ be a new instance of <a idl>ReadableStream</a> in _targetRealm_.
1. Set _that_.[[state]] to *this*.[[state]].
1. Set _that_.[[disturbed]] to *this*.[[disturbed]].
1. Let _controller_ be *this*.[[readableStreamController]].
1. Set _controller_.[[controlledReadableStream]] to _that_.
1. Set _that_.[[readableStreamController]] to _controller_.
1. Set _this_.[[Detached]] to *true*.
1. Return _that_.
</emu-alg>

<h3 id="rs-abstract-ops">General Readable Stream Abstract Operations</h3>

The following abstract operations, unlike most in this specification, are meant to be generally useful by other
Expand Down Expand Up @@ -781,10 +805,11 @@ readable stream has ever been read from or canceled.
<var>stream</var> )</h4>

This abstract operation is meant to be called from other specifications that may wish to query whether or not a
readable stream is <a>locked to a reader</a>.
readable stream is <a>locked to a reader</a> or has been transferred away to another realm.

<emu-alg>
1. Assert: ! IsReadableStream(_stream_) is *true*.
1. If _stream_.[[Detached]] is *true*, return *true*.
1. If _stream_.[[reader]] is *undefined*, return *false*.
1. Return *true*.
</emu-alg>
Expand Down Expand Up @@ -940,6 +965,7 @@ nothrow>ReadableStreamAddReadIntoRequest ( <var>stream</var> )</h4>
<var>reason</var> )</h4>

<emu-alg>
1. Assert: _stream_.[[Detached]] is *false*.
1. Set _stream_.[[disturbed]] to *true*.
1. If _stream_.[[state]] is `"closed"`, return <a>a promise resolved with</a> *undefined*.
1. If _stream_.[[state]] is `"errored"`, return <a>a promise rejected with</a> _stream_.[[storedError]].
Expand Down
27 changes: 27 additions & 0 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const { AcquireWritableStreamDefaultWriter, IsWritableStream, IsWritableStreamLo

const InternalCancel = Symbol('[[Cancel]]');
const InternalPull = Symbol('[[Pull]]');
const InternalTranfer = Symbol('[[Transfer]]');

class ReadableStream {
constructor(underlyingSource = {}, { size, highWaterMark } = {}) {
Expand All @@ -21,6 +22,7 @@ class ReadableStream {
this._reader = undefined;
this._storedError = undefined;

this._Detached = false;
this._disturbed = false;

// Initialize to undefined first because the constructor of the controller checks this
Expand Down Expand Up @@ -251,6 +253,26 @@ class ReadableStream {
const branches = ReadableStreamTee(this, false);
return createArrayFromList(branches);
}

[InternalTranfer](/* targetRealm */) {
if (IsReadableStreamLocked(this) === true) {
throw new TypeError('Cannot transfer a locked stream');
}
if (this.state === 'errored') {
throw new TypeError('Cannot transfer an errored stream');
}
/* can't exactly polyfill realm-transfer */
const that = new ReadableStream();
that._state = this._state;
that._disturbed = this._disturbed;

const controller = this._readableStreamController;
controller._controlledReadableStream = that;
that._readableStreamController = controller;
this._Detached = true;

return that;
}
}

module.exports = {
Expand Down Expand Up @@ -293,6 +315,9 @@ function IsReadableStreamDisturbed(stream) {
function IsReadableStreamLocked(stream) {
assert(IsReadableStream(stream) === true, 'IsReadableStreamLocked should only be used on known readable streams');

if (stream._Detached === true) {
return true;
}
if (stream._reader === undefined) {
return false;
}
Expand Down Expand Up @@ -469,6 +494,8 @@ function ReadableStreamAddReadRequest(stream) {
}

function ReadableStreamCancel(stream, reason) {
assert(stream._Detached === false);

stream._disturbed = true;

if (stream._state === 'closed') {
Expand Down

0 comments on commit 20b4a8b

Please sign in to comment.