Skip to content

Commit

Permalink
Try to implement cloning in readablestream
Browse files Browse the repository at this point in the history
Lacking an obvious way to actually postMessage a stream
in this polyfill, this adds the 'cloning' stream type
for testing purposes.

Byte streams to come.
  • Loading branch information
isonmad committed Dec 4, 2016
1 parent 4c7ae68 commit 7197836
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
35 changes: 28 additions & 7 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,10 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
1. If _highWaterMark_ is *undefined*, let _highWaterMark_ be *0*.
1. Set *this*.[[readableStreamController]] to ? Construct(`<a idl>ReadableByteStreamController</a>`, « *this*,
_underlyingSource_, _highWaterMark_ »).
1. Otherwise, if _type_ is *undefined*,
1. Otherwise, if _type_ is *undefined* or _type_ is `"cloning"`,
1. If _highWaterMark_ is *undefined*, let _highWaterMark_ be *1*.
1. Set *this*.[[readableStreamController]] to ? Construct(`<a idl>ReadableStreamDefaultController</a>`, « *this*,
_underlyingSource_, _size_, _highWaterMark_ »).
_underlyingSource_, _size_, _highWaterMark_, _type_ »).
1. Otherwise, throw a *RangeError* exception.
</emu-alg>

Expand Down Expand Up @@ -747,12 +747,17 @@ The following internal method is implemented by each {{ReadableStream}} instance
<emu-alg>
1. If ! IsReadableStreamLocked(*this*) is *true*, throw a *TypeError* exception.
1. If *this*.[[state]] is `"errored"`, throw a *TypeError* exception.
1. Let _controller_ be *this*.[[readableStreamController]].
1. If _controller_.[[targetRealm]] is *undefined*, throw a *TypeError* exception.
1. Let _that_ be a new instance of <a idl>ReadableStream</a> in _targetRealm_.
1. Set _that_.[[state]] to *this*.[[state]].
1. Set _that_.[[disturbed]] to *this*.[[disturbed]].
1. Let _controller_ be *this*.[[readableStreamController]].
1. Set _controller_.[[controlledReadableStream]] to _that_.
1. Set _that_.[[readableStreamController]] to _controller_.
1. Let _queue_ be _controller_.[[queue]].
1. Repeat for each Record {[[value]], [[size]]} _pair_ that is an element of _queue_,
1. Set _pair_.[[value]] to ! <a abstract-op>StructuredClone</a>(_pair_.[[value]], _targetRealm_).
1. Set _controller_.[[targetRealm]] to _targetRealm_.
1. Set _this_.[[Detached]] to *true*.
1. Return _that_.
</emu-alg>
Expand Down Expand Up @@ -1472,6 +1477,12 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s
<th>Description (<em>non-normative</em>)</th>
</tr>
</thead>
<tr>
<td>\[[targetRealm]]
<td>Either *undefined*, or a Realm Record. If set to a Realm Record, ReadableStreamDefaultControllerEnqueue
will perform the structured clone algorithm on passed chunks, cloning them into the targetRealm
and enqueueing the result.
</tr>
<tr>
<td>\[[closeRequested]]
<td>A boolean flag indicating whether the stream has been closed by its <a>underlying source</a>, but still has
Expand Down Expand Up @@ -1520,7 +1531,7 @@ Instances of {{ReadableStreamDefaultController}} are created with the internal s
<h4 id="rs-default-controller-constructor" constructor for="ReadableStreamDefaultController"
lt="ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)">new
ReadableStreamDefaultController(<var>stream</var>, <var>underlyingSource</var>, <var>size</var>,
<var>highWaterMark</var>)</h4>
<var>highWaterMark</var>, <var>type</var>)</h4>

<div class="note">
The <code>ReadableStreamDefaultController</code> constructor cannot be used directly; it only works on a
Expand All @@ -1534,6 +1545,8 @@ ReadableStreamDefaultController(<var>stream</var>, <var>underlyingSource</var>,
1. Set *this*.[[underlyingSource]] to _underlyingSource_.
1. Set *this*.[[queue]] to a new empty List.
1. Set *this*.[[started]], *this*.[[closeRequested]], *this*.[[pullAgain]], and *this*.[[pulling]] to *false*.
1. Set *this*.[[targetRealm]] to *undefined*.
1. If _type_ is `"cloning"`, set *this*.[[targetRealm]] to the current Realm Record.
1. Let _normalizedStrategy_ be ? ValidateAndNormalizeQueuingStrategy(_size_, _highWaterMark_).
1. Set *this*.[[strategySize]] to _normalizedStrategy_.[[size]] and *this*.[[strategyHWM]] to
_normalizedStrategy_.[[highWaterMark]].
Expand Down Expand Up @@ -1707,8 +1720,14 @@ asserts).
1. Let _stream_ be _controller_.[[controlledReadableStream]].
1. Assert: _controller_.[[closeRequested]] is *false*.
1. Assert: _stream_.[[state]] is `"readable"`.
1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*, perform
! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*).
1. If ! IsReadableStreamLocked(_stream_) is *true* and ! ReadableStreamGetNumReadRequests(_stream_) > *0*,
1. If _controller_.[[targetRealm]] is not *undefined*,
1. Let _chunk_ be <a abstract-op>StructuredClone</a>(_chunk_, _controller_.[[targetRealm]]).
1. If _chunk_ is an abrupt completion,
1. Perform ! ReadableStreamDefaultControllerErrorIfNeeded(_controller_, _chunk_.[[Value]]).
1. Return _chunk_.
1. Let _chunk_ be _chunk_.[[Value]].
1. Perform ! ReadableStreamFulfillReadRequest(_stream_, _chunk_, *false*).
1. Otherwise,
1. Let _chunkSize_ be *1*.
1. If _controller_.[[strategySize]] is not *undefined*,
Expand Down Expand Up @@ -3615,11 +3634,13 @@ throughout the rest of this standard.
</emu-alg>

<h4 id="enqueue-value-with-size" aoid="EnqueueValueWithSize" throws>EnqueueValueWithSize ( <var>queue</var>,
<var>value</var>, <var>size</var> )</h4>
<var>value</var>, <var>size</var>, <var>targetRealm</var> )</h4>

<emu-alg>
1. If _targetRealm_ was not passed, let _targetRealm_ be *undefined*.
1. Let _size_ be ? ToNumber(_size_).
1. If ! IsFiniteNonNegativeNumber(_size_) is *false*, throw a *RangeError* exception.
1. If _targetRealm_ is not *undefined*, let _value_ be the result of ? StructuredClone(_value_, _targetRealm_).
1. Append Record {[[value]]: _value_, [[size]]: _size_} as the last element of _queue_.
</emu-alg>

Expand Down
8 changes: 7 additions & 1 deletion reference-implementation/lib/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';
const assert = require('assert');
/* structured clone is impossible to truly polyfill, but closest match */
const StructuredClone = require('realistic-structured-clone');
const { IsFiniteNonNegativeNumber } = require('./helpers.js');

exports.DequeueValue = queue => {
Expand All @@ -11,12 +13,16 @@ exports.DequeueValue = queue => {
return pair.value;
};

exports.EnqueueValueWithSize = (queue, value, size) => {
exports.EnqueueValueWithSize = (queue, value, size, targetRealm) => {
size = Number(size);
if (!IsFiniteNonNegativeNumber(size)) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}

if (targetRealm !== undefined) {
value = StructuredClone(value/* , targetRealm*/);
}

queue.push({ value, size });

if (queue._totalSize === undefined) {
Expand Down
35 changes: 30 additions & 5 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';
const assert = require('assert');
/* structured clone is impossible to truly polyfill, but closest match */
const StructuredClone = require('realistic-structured-clone');
const { ArrayBufferCopy, CreateIterResultObject, IsFiniteNonNegativeNumber, InvokeOrNoop, PromiseInvokeOrNoop,
SameRealmTransfer, ValidateAndNormalizeQueuingStrategy, ValidateAndNormalizeHighWaterMark } =
require('./helpers.js');
Expand Down Expand Up @@ -35,11 +37,13 @@ class ReadableStream {
highWaterMark = 0;
}
this._readableStreamController = new ReadableByteStreamController(this, underlyingSource, highWaterMark);
} else if (type === undefined) {
} else if (type === undefined || type === 'cloning') {
if (highWaterMark === undefined) {
highWaterMark = 1;
}
this._readableStreamController = new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark);

this._readableStreamController =
new ReadableStreamDefaultController(this, underlyingSource, size, highWaterMark, type);
} else {
throw new RangeError('Invalid type is specified');
}
Expand Down Expand Up @@ -261,14 +265,21 @@ class ReadableStream {
if (this._state === 'errored') {
throw new TypeError('Cannot transfer an errored stream');
}
const controller = this._readableStreamController;
if (controller._targetRealm === undefined) {
throw new TypeError('Only cloning streams are transferable');
}
/* can't exactly polyfill realm-transfer */
const that = new ReadableStream();
that._state = this._state;
that._disturbed = this._disturbed;

const controller = this._readableStreamController;
controller._controlledReadableStream = that;
that._readableStreamController = controller;
for (const pair of controller._queue) {
pair.value = StructuredClone(pair.value/* , targetRealm*/);
}
controller._targetRealm = that; // controller.[[targetRealm]] = targetRealm
this._Detached = true;

return that;
Expand Down Expand Up @@ -866,7 +877,7 @@ function ReadableStreamDefaultReaderRead(reader) {
// Controllers

class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark) {
constructor(stream, underlyingSource, size, highWaterMark, type) {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamDefaultController can only be constructed with a ReadableStream instance');
}
Expand All @@ -885,6 +896,12 @@ class ReadableStreamDefaultController {
this._closeRequested = false;
this._pullAgain = false;
this._pulling = false;
this._targetRealm = undefined;

if (type === 'cloning') {
/* no way to represent a Realm Record in polyfill */
this._targetRealm = stream; // set this.[[targetRealm]] to current Realm Record
}

const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this._strategySize = normalizedStrategy.size;
Expand Down Expand Up @@ -1089,6 +1106,14 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
assert(stream._state === 'readable');

if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) {
if (controller._targetRealm !== undefined) {
try {
chunk = StructuredClone(chunk/* , controller._targetRealm */);
} catch (cloneE) {
ReadableStreamDefaultControllerErrorIfNeeded(controller, cloneE);
throw cloneE;
}
}
ReadableStreamFulfillReadRequest(stream, chunk, false);
} else {
let chunkSize = 1;
Expand All @@ -1103,7 +1128,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) {
}

try {
EnqueueValueWithSize(controller._queue, chunk, chunkSize);
EnqueueValueWithSize(controller._queue, chunk, chunkSize, controller._targetRealm);
} catch (enqueueE) {
ReadableStreamDefaultControllerErrorIfNeeded(controller, enqueueE);
throw enqueueE;
Expand Down
3 changes: 3 additions & 0 deletions reference-implementation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
"Takeshi Yoshino <tyoshino@chromium.org>"
],
"license": "(CC0-1.0 OR MIT)",
"dependencies": {
"realistic-structured-clone": "^0.0.3"
},
"devDependencies": {
"eslint": "^3.2.2",
"glob": "^7.0.3",
Expand Down

0 comments on commit 7197836

Please sign in to comment.