Skip to content

Commit

Permalink
Make readable stream's ready promise fulfill-only
Browse files Browse the repository at this point in the history
Part of #245; see discussions in #243 (comment). Consumers should always either check the state property, or count on the fact that read() will throw when used on an errored stream.
  • Loading branch information
domenic committed Dec 1, 2014
1 parent d4dde16 commit 8ad6f2b
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 94 deletions.
49 changes: 25 additions & 24 deletions Examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ Although the by-far most common way of consuming a readable stream will be to pi

```js
function streamToConsole(readable) {
readable.closed.then(
() => console.log("--- all done!"),
e => console.error(e);
);

pump();

function pump() {
while (readable.state === "readable") {
console.log(readable.read());
}

if (readable.state === "closed") {
console.log("--- all done!");
} else {
// If we're in an error state, the returned promise will be rejected with that error,
// so no need to handle "waiting" vs. "errored" separately.
readable.ready.then(pump, e => console.error(e));
if (readable.state === "waiting") {
readable.ready.then(pump);
}

// Otherwise the stream is "closed" or "errored", which will be handled above.
}
}
```
Expand All @@ -45,12 +48,12 @@ function getNext(stream) {
}

return stream.ready.then(function () {
if (stream.state === "readable") {
return stream.read();
if (stream.state === "closed") {
return EOF;
}

// State must be "closed":
return EOF;
// If stream is "errored", this will throw, causing the promise to be rejected.
return stream.read();
});
}

Expand All @@ -68,24 +71,22 @@ As a final example, this function uses the reading APIs to buffer the entire str

```js
function readableStreamToArray(readable) {
return new Promise((resolve, reject) => {
var chunks = [];

readable.closed.then(() => resolve(chunks), reject);
pump();
var chunks = [];

function pump() {
while (readable.state === "readable") {
chunks.push(readable.read());
}
pump();
return readable.closed.then(() => chunks);

if (readable.state === "waiting") {
readable.ready.then(pump);
}
function pump() {
while (readable.state === "readable") {
chunks.push(readable.read());
}

// All other cases will go through `readable.closed.then(...)` above.
if (readable.state === "waiting") {
readable.ready.then(pump);
}
});

// Otherwise the stream is "closed" or "errored", which will be handled above.
}
}

readableStreamToArray(myStream).then(chunks => {
Expand Down
26 changes: 14 additions & 12 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -305,20 +305,23 @@ it.

<pre><code class="lang-javascript">
function streamToConsole(readableStream) {
readableStream.closed.then(
() => console.log("--- all done!"),
e => console.error(e);
);

pump();

function pump() {
while (readableStream.state === "readable") {
console.log(readableStream.read());
}

if (readableStream.state === "closed") {
console.log("--- all done!");
} else {
// If we're in an error state, the returned promise will be rejected with
// that error, so no need to handle "waiting" vs. "errored" separately.
readableStream.ready.then(pump, e => console.error(e));
if (readableStream.state === "waiting") {
readableStream.ready.then(pump);
}

// otherwise "closed" or "errored", which will be handled above.
}
}
</code></pre>
Expand Down Expand Up @@ -449,8 +452,7 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
</tr>
<tr>
<td>\[[readyPromise]]
<td>A promise that becomes fulfilled when the stream becomes <code>"readable"</code>, and is replaced with a new
pending promise when the stream becomes <code>"waiting"</code>; returned by the <code>ready</code> getter
<td>A promise returned by the <code>ready</code> getter
</tr>
</table>

Expand Down Expand Up @@ -524,8 +526,9 @@ Instances of <code>ReadableStream</code> are created with the internal slots des
<h5 id="rs-ready">get ready</h5>

<div class="note">
The <code>ready</code> getter returns a promise that will be fulfilled either when the stream's internal queue becomes
nonempty, or the stream becomes closed. (The promise will be rejected if the stream errors.)
The <code>ready</code> getter returns a promise that will be fulfilled when the stream transitions away from the
<code>"waiting"</code> state to any other state. Once the stream transitions back to <code>"waiting"</code>, the
getter will return a new promise that stays pending until the next state transition.
</div>

<ol>
Expand Down Expand Up @@ -729,15 +732,14 @@ a variable <var>stream</var>, that performs the following steps:
<ol>
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Reject <var>stream</var>@\[[readyPromise]] with <var>e</var>.
<li> Resolve <var>stream</var>@\[[readyPromise]] with <b>undefined</b>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
</ol>
<li> If <var>stream</var>@\[[state]] is <code>"readable"</code>,
<ol>
<li> Let <var>stream</var>@\[[queue]] be a new empty List.
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> Let <var>stream</var>@\[[readyPromise]] be a new promise rejected with <var>e</var>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
</ol>
</ol>
Expand Down
21 changes: 4 additions & 17 deletions reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export default class ReadableStream {
} else if (source.state === 'waiting') {
Promise.race([source.ready, dest.closed]).then(doPipe, doPipe);
} else if (source.state === 'errored') {
source.ready.catch(abortDest);
source.closed.catch(abortDest);
} else if (source.state === 'closed') {
closeDest();
}
Expand All @@ -121,12 +121,12 @@ export default class ReadableStream {
} else if (source.state === 'waiting') {
Promise.race([source.ready, dest.ready]).then(doPipe, doPipe);
} else if (source.state === 'errored') {
source.ready.catch(abortDest);
source.closed.catch(abortDest);
} else if (source.state === 'closed') {
closeDest();
}
} else if (ds === 'errored' && (source.state === 'readable' || source.state === 'waiting')) {
dest.ready.catch(cancelSource);
dest.closed.catch(cancelSource);
} else if ((ds === 'closing' || ds === 'closed') &&
(source.state === 'readable' || source.state === 'waiting')) {
cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore'));
Expand Down Expand Up @@ -196,7 +196,6 @@ export default class ReadableStream {
_initReadyPromise() {
this._readyPromise = new Promise((resolve, reject) => {
this._readyPromise_resolve = resolve;
this._readyPromise_reject = reject;
});
}

Expand All @@ -216,13 +215,6 @@ export default class ReadableStream {
_resolveReadyPromise(value) {
this._readyPromise_resolve(value);
this._readyPromise_resolve = null;
this._readyPromise_reject = null;
}

_rejectReadyPromise(reason) {
this._readyPromise_reject(reason);
this._readyPromise_resolve = null;
this._readyPromise_reject = null;
}

_resolveClosedPromise(value) {
Expand Down Expand Up @@ -322,18 +314,13 @@ function CreateReadableStreamErrorFunction(stream) {
if (stream._state === 'waiting') {
stream._state = 'errored';
stream._storedError = e;
stream._rejectReadyPromise(e);
stream._resolveReadyPromise(undefined);
stream._rejectClosedPromise(e);
}
else if (stream._state === 'readable') {
stream._queue = [];
stream._state = 'errored';
stream._storedError = e;

stream._readyPromise = Promise.reject(e);
stream._readyPromise_resolve = null;
stream._readyPromise_reject = null;

stream._rejectClosedPromise(e);
}
};
Expand Down
30 changes: 13 additions & 17 deletions reference-implementation/test/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,14 @@ test('ReadableStream pull throws an error', t => {
var error = new Error('aaaugh!!');
var rs = new ReadableStream({ pull() { throw error; } });

rs.ready.then(() => {
t.fail('waiting should fail');
t.end();
});

rs.closed.then(() => {
t.fail('the stream should not close successfully');
t.end();
});

rs.ready.catch(caught => {
t.equal(rs.state, 'errored', 'state is "errored" after waiting');
t.equal(caught, error, 'error was passed through as rejection of ready');
rs.ready.then(v => {
t.equal(rs.state, 'errored', 'state is "errored" after waiting'),
t.equal(v, undefined, 'ready fulfills with undefined')
});

rs.closed.catch(caught => {
Expand Down Expand Up @@ -517,7 +512,9 @@ test('ReadableStream if size is NaN, the stream is errored', t => {
});
});

test('ReadableStream errors in shouldApplyBackpressure prevent ready from fulfilling', t => {
test('ReadableStream errors in shouldApplyBackpressure cause ready to fulfill and closed to rejected', t => {
t.plan(3);

var thrownError = new Error('size failure');
var callsToShouldApplyBackpressure = 0;
var rs = new ReadableStream({
Expand Down Expand Up @@ -546,13 +543,12 @@ test('ReadableStream errors in shouldApplyBackpressure prevent ready from fulfil
});

rs.ready.then(
() => {
t.fail('ready should not be fulfilled');
t.end();
},
e => {
t.equal(e, thrownError, 'ready should be rejected with the thrown error');
t.end();
}
v => t.equal(v, undefined, 'ready should be fulfilled with undefined'),
e => t.fail('ready should not be rejected')
);

rs.closed.then(
v => t.fail('closed should not be fulfilled'),
e => t.equal(e, thrownError, 'closed should be rejected with the thrown error')
);
});
14 changes: 2 additions & 12 deletions reference-implementation/test/transform-stream-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ var test = require('tape');
import TransformStream from '../lib/transform-stream';

test('TransformStream errors thrown in transform put the writable and readable in an errored state', t => {
t.plan(10);
t.plan(9);

var thrownError = new Error('bad things are happening!');
var ts = new TransformStream({
Expand Down Expand Up @@ -32,11 +32,6 @@ test('TransformStream errors thrown in transform put the writable and readable i
}
}, 0);

ts.readable.ready.then(
() => t.fail('readable\'s ready should not be fulfilled'),
e => t.equal(e, thrownError, 'readable\'s ready should be rejected with the thrown error')
);

ts.readable.closed.then(
() => t.fail('readable\'s closed should not be fulfilled'),
e => t.equal(e, thrownError, 'readable\'s closed should be rejected with the thrown error')
Expand All @@ -49,7 +44,7 @@ test('TransformStream errors thrown in transform put the writable and readable i
});

test('TransformStream errors thrown in flush put the writable and readable in an errored state', t => {
t.plan(12);
t.plan(11);

var thrownError = new Error('bad things are happening!');
var ts = new TransformStream({
Expand Down Expand Up @@ -86,11 +81,6 @@ test('TransformStream errors thrown in flush put the writable and readable in an
}
}, 0);

ts.readable.ready.then(
() => t.fail('readable\'s ready should not be fulfilled'),
e => t.equal(e, thrownError, 'readable\'s ready should be rejected with the thrown error')
);

ts.readable.closed.then(
() => t.fail('readable\'s closed should not be fulfilled'),
e => t.equal(e, thrownError, 'readable\'s closed should be rejected with the thrown error')
Expand Down
24 changes: 12 additions & 12 deletions reference-implementation/test/utils/readable-stream-to-array.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
export default function readableStreamToArray(readable) {
return new Promise((resolve, reject) => {
var chunks = [];
var chunks = [];

readable.closed.then(() => resolve(chunks), reject);
pump();
pump();
return readable.closed.then(() => chunks);

function pump() {
while (readable.state === 'readable') {
chunks.push(readable.read());
}
function pump() {
while (readable.state === "readable") {
chunks.push(readable.read());
}

if (readable.state === 'waiting') {
readable.ready.then(pump);
}
if (readable.state === "waiting") {
readable.ready.then(pump);
}
});

// Otherwise the stream is "closed" or "errored", which will be handled above.
}
}

0 comments on commit 8ad6f2b

Please sign in to comment.