Skip to content

Commit

Permalink
Make writable stream's ready promise fulfill-only
Browse files Browse the repository at this point in the history
Closes #245. Now .ready is just a signal that the stream has transitioned from "waiting" to any other state.
  • Loading branch information
domenic committed Dec 1, 2014
1 parent 8ad6f2b commit f40e1ef
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 98 deletions.
4 changes: 1 addition & 3 deletions Examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ function promptAndWrite(myStream) {
});
} else if (writableStream.state === "waiting") {
console.log("Waiting for the stream to flush to the underlying sink, please hold...");
writableStream.ready
.then(promptAndWrite)
.catch(e => console.error("While flushing, an error occurred: ", e));
writableStream.ready.then(promptAndWrite);
} else if (writableStream.state === "errored") {
console.error("Error writing; this session is over!");
}
Expand Down
23 changes: 9 additions & 14 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,8 +1027,7 @@ Instances of <code>WritableStream</code> are created with the internal slots des
</tr>
<tr>
<td>\[[readyPromise]]
<td>A promise that becomes fulfilled when the stream becomes <code>"writable"</code>, and is replaced with a new
pending promise when the stream becomes <code>"waiting"</code>; returned by the <code>ready</code> getter
<td>A promise returned by the <code>ready</code> getter
</tr>
<tr>
<td>\[[writing]]
Expand Down Expand Up @@ -1122,11 +1121,12 @@ Instances of <code>WritableStream</code> are created with the internal slots des
<h5 id="ws-ready">get ready</h5>

<div class="note">
The <code>ready</code> getter returns a promise that will be fulfilled when the stream enters the
<code>"writable"</code> state, i.e., when the stream's internal queue is not full according to its <a>queuing
strategy</a>. (The promise will be rejected if the stream errors.)
The <code>ready</code> getter returns a promise that will be fulfilled when the stream transitions away from the
<code>"waiting"</code> state to any other state. Once the stream transitions back to <code>"waiting"</code>, the
getter will return a new promise that stays pending until the next state transition.

In essence, this promise gives a signal as to when any backpressure has let up.
In essence, this promise gives a signal as to when any backpressure has let up (or that the stream has been closed
or errored).
</div>

<ol>
Expand Down Expand Up @@ -1194,10 +1194,7 @@ Instances of <code>WritableStream</code> are created with the internal slots des
<b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"errored"</code>, return a promise rejected with
<b>this</b>@\[[storedError]].
<li> If <b>this</b>@\[[state]] is <code>"writable"</code>, set <b>this</b>@\[[readyPromise]] to a new promise
rejected with a <b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"waiting"</code>, reject <b>this</b>@\[[readyPromise]] with a
<b>TypeError</b> exception.
<li> If <b>this</b>@\[[state]] is <code>"waiting"</code>, resolve <b>this</b>@\[[readyPromise]] with <b>undefined</b>.
<li> Set <b>this</b>@\[[state]] to <code>"closing"</code>
<li> Call-with-rethrow EnqueueValueWithSize(<b>this</b>@\[[queue]], <code>"close"</code>, <b>0</b>).
<li> Call-with-rethrow CallOrScheduleWritableStreamAdvanceQueue(<b>this</b>).
Expand Down Expand Up @@ -1301,10 +1298,8 @@ a variable <var>stream</var>, that performs the following steps:
<var>e</var>.
</ol>
<li> Set <var>stream</var>@\[[storedError]] to <var>e</var>.
<li> If <var>stream</var>@\[[state]] is <code>"writable"</code> or <code>"closing"</code>, set
<var>stream</var>@\[[readyPromise]] to a new promise rejected with <var>e</var>.
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>, reject <var>stream</var>@\[[readyPromise]] with
<var>e</var>.
<li> If <var>stream</var>@\[[state]] is <code>"waiting"</code>, resolve <var>stream</var>@\[[readyPromise]] with
<b>undefined</b>.
<li> Reject <var>stream</var>@\[[closedPromise]] with <var>e</var>.
<li> Set <var>stream</var>@\[[state]] to <code>"errored"</code>.
</ol>
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/lib/readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export default class ReadableStream {
if (source.state === 'readable') {
Promise.race([source.closed, dest.ready]).then(doPipe, doPipe);
} else if (source.state === 'waiting') {
Promise.race([source.ready, dest.ready]).then(doPipe, doPipe);
Promise.race([source.ready, dest.ready]).then(doPipe);
} else if (source.state === 'errored') {
source.closed.catch(abortDest);
} else if (source.state === 'closed') {
Expand Down
16 changes: 2 additions & 14 deletions reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export default class WritableStream {

this._readyPromise = Promise.resolve(undefined);
this._readyPromise_resolve = null;
this._readyPromise_reject = null;

this._queue = [];
this._state = 'writable';
Expand Down Expand Up @@ -87,13 +86,8 @@ export default class WritableStream {
if (this._state === 'errored') {
return Promise.reject(this._storedError);
}
if (this._state === 'writable') {
this._readyPromise = Promise.reject(new TypeError('stream has already been closed'));
this._readyPromise_resolve = null;
this._readyPromise_reject = null;
}
if (this._state === 'waiting') {
this._readyPromise_reject(new TypeError('stream has already been closed'));
this._readyPromise_resolve(undefined);
}

this._state = 'closing';
Expand Down Expand Up @@ -203,13 +197,8 @@ function CreateWritableStreamErrorFunction(stream) {

stream._storedError = e;

if (stream._state === 'writable' || stream._state === 'closing') {
stream._readyPromise = Promise.reject(e);
stream._readyPromise_resolve = null;
stream._readyPromise_reject = null;
}
if (stream._state === 'waiting') {
stream._readyPromise_reject(e);
stream._readyPromise_resolve(undefined);
}
stream._closedPromise_reject(e);
stream._state = 'errored';
Expand All @@ -231,7 +220,6 @@ function SyncWritableStreamStateWithQueue(stream) {
stream._state = 'waiting';
stream._readyPromise = new Promise((resolve, reject) => {
stream._readyPromise_resolve = resolve;
stream._readyPromise_reject = reject;
});
}

Expand Down
22 changes: 10 additions & 12 deletions reference-implementation/test/pipe-to.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,18 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro
ws.write('Hello');
t.assert(writeCalled, 'write must be called');

ws.ready.then(
() => t.fail('ready promise unexpectedly fulfilled'),
() => {
t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state');
ws.ready.then(() => {
t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state');

rs.pipeTo(ws);
rs.pipeTo(ws);

// Need to delay because pipeTo retrieves error from dest using ready.
setTimeout(() => {
t.assert(cancelCalled);
t.equal(rs.state, 'closed');
t.end();
}, 0);
});
// Need to delay because pipeTo retrieves error from dest using ready.
setTimeout(() => {
t.assert(cancelCalled);
t.equal(rs.state, 'closed');
t.end();
}, 0);
});
}, 0);
});

Expand Down
22 changes: 10 additions & 12 deletions reference-implementation/test/writable-stream-abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ test('Aborting a WritableStream passes through the given reason', t => {
});

test('Aborting a WritableStream puts it in an errored state, with stored error equal to the abort reason', t => {
t.plan(6);
t.plan(5);

var recordedReason;
var ws = new WritableStream();
Expand All @@ -113,11 +113,6 @@ test('Aborting a WritableStream puts it in an errored state, with stored error e
r => t.equal(r, passedReason, 'writing should reject with the given reason')
);

ws.ready.then(
() => t.fail('ready should not succeed'),
r => t.equal(r, passedReason, 'ready should reject with the given reason')
);

ws.close().then(
() => t.fail('closing should not succeed'),
r => t.equal(r, passedReason, 'closing should reject with the given reason')
Expand All @@ -134,18 +129,21 @@ test('Aborting a WritableStream puts it in an errored state, with stored error e
);
});

test('Aborting a WritableStream causes any outstanding ready promises to be rejected with the abort reason', t => {
test('Aborting a WritableStream causes any outstanding ready promises to be fulfilled immediately', t => {
t.plan(2);

var recordedReason;
var ws = new WritableStream({});
var ws = new WritableStream({
write(chunk) {
return new Promise(() => {}); // forever-pending, so normally .ready would not fulfill.
}
});
ws.write('a');
t.equal(ws.state, 'waiting', 'state should be waiting');

ws.ready.then(
() => t.fail('ready should not succeed'),
r => t.equal(r, passedReason, 'ready should reject with the given reason')
);
ws.ready.then(() => {
t.equal(ws.state, 'errored', 'state should now be errored');
});

var passedReason = new Error('Sorry, it just wasn\'t meant to be.');
ws.abort(passedReason);
Expand Down
110 changes: 68 additions & 42 deletions reference-implementation/test/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ test('WritableStream if write returns a rejected promise, queued write and close
}, 0);
});

test('If close is called on a WritableStream in writable state, ready will return a rejected promise', t => {
test('If close is called on a WritableStream in writable state, ready will return a fulfilled promise', t => {
var ws = new WritableStream({
write() {
t.fail('Unexpected write call');
Expand All @@ -350,7 +350,7 @@ test('If close is called on a WritableStream in writable state, ready will retur
abort() {
t.fail('Unexpected abort call');
t.end();
},
}
});

// Wait for ws to start.
Expand All @@ -360,23 +360,55 @@ test('If close is called on a WritableStream in writable state, ready will retur
ws.close();
t.equal(ws.state, 'closing', 'state must become closing synchronously on close call');

ws.ready.then(
() => t.fail('ready on ws returned a fulfilled promise unexpectedly'),
r => {
t.equal(r.constructor, TypeError,
'ready must start returning a promise rejected with a TypeError exception');
t.end();
}
);
ws.ready.then(v => {
t.equal(ws.state, 'closed', 'state must be closed by the time ready fulfills (because microtasks ordering)');
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}, 0);
});

test('If close is called on a WritableStream in waiting state, ready will return a rejected promise', t => {
test('If close is called on a WritableStream in waiting state, ready will return a fulfilled promise', t => {
var ws = new WritableStream({
abort() {
t.fail('Unexpected abort call');
t.end();
}
});

// Wait for ws to start.
setTimeout(() => {
ws.write('a');
t.equal(ws.state, 'waiting', 'state must become waiting synchronously on write call');

ws.close();
t.equal(ws.state, 'closing', 'state must become closing synchronously on close call');

ws.ready.then(v => {
t.equal(ws.state, 'closing', 'state must still be closing when ready fulfills');
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}, 0);
});

test('If close is called on a WritableStream in waiting state, ready will be fulfilled immediately even if close ' +
'takes a long time', t => {

var readyFulfilledAlready = false;
var ws = new WritableStream({
abort() {
t.fail('Unexpected abort call');
t.end();
},
close() {
return new Promise(resolve => {
setTimeout(() => {
t.ok(readyFulfilledAlready, 'ready should have fulfilled already');
resolve();
}, 50);
});
}
});

// Wait for ws to start.
Expand All @@ -387,18 +419,16 @@ test('If close is called on a WritableStream in waiting state, ready will return
ws.close();
t.equal(ws.state, 'closing', 'state must become closing synchronously on close call');

ws.ready.then(
() => t.fail('ready on ws returned a fulfilled promise unexpectedly'),
r => {
t.equal(r.constructor, TypeError,
'ready must start returning a promise rejected with a TypeError exception');
t.end();
}
);
ws.ready.then(v => {
readyFulfilledAlready = true;
t.equal(ws.state, 'closing', 'state must still be closing when ready fulfills');
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}, 0);
});

test('If sink rejects on a WritableStream in writable state, ready will return a rejected promise', t => {
test('If sink rejects on a WritableStream in writable state, ready will return a fulfilled promise', t => {
t.plan(5);

var rejectWritePromise;
Expand All @@ -422,10 +452,10 @@ test('If sink rejects on a WritableStream in writable state, ready will return a
t.equal(r, passedError, 'write() should be rejected with the passed error');
t.equal(ws.state, 'errored', 'state is errored as error is called');

ws.ready.then(
() => t.fail('ready on ws returned a fulfilled promise unexpectedly'),
r => t.equal(r, passedError, 'ready should be rejected with the passed error')
);
ws.ready.then(v => {
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}
);
}, 0);
Expand Down Expand Up @@ -459,14 +489,12 @@ test('WritableStream if sink\'s close throws', t => {
},
r => {
t.equal(ws.state, 'errored', 'state must be errored as error is called');
t.equal(r, passedError, 'close() should be rejected with the passed error');

ws.ready.then(
() => t.fail('ready on ws returned a fulfilled promise unexpectedly'),
r => {
t.equal(r, passedError, 'ready should be rejected with the passed error');
t.end();
}
);
ws.ready.then(v => {
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}
);
}, 0);
Expand Down Expand Up @@ -500,14 +528,12 @@ test('WritableStream if the promise returned by sink\'s close rejects', t => {
},
r => {
t.equal(ws.state, 'errored', 'state must be errored as error is called');
t.equal(r, passedError, 'close() should be rejected with the passed error');

ws.ready.then(
() => t.fail('ws.ready returned a fulfilled promise'),
r => {
t.equal(r, passedError, 'ready should be rejected with the passed error');
t.end();
}
);
ws.ready.then(v => {
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}
);
}, 0);
Expand Down Expand Up @@ -539,10 +565,10 @@ test('If sink rejects on a WritableStream in waiting state, ready will return a
t.equal(r, passedError, 'write() should be rejected with the passed error');
t.equal(ws.state, 'errored', 'state is errored as error is called');

ws.ready.then(
() => t.fail('ready on ws returned a fulfilled promise unexpectedly'),
r => t.equal(r, passedError, 'ready should be rejected with the passed error')
);
ws.ready.then(v => {
t.equal(v, undefined, 'ready promise was fulfilled with undefined');
t.end();
});
}
);
}, 0);
Expand Down

0 comments on commit f40e1ef

Please sign in to comment.