From f40e1ef812210be535d20848d1919535296bafc8 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Wed, 26 Nov 2014 15:17:10 -0500 Subject: [PATCH] Make writable stream's ready promise fulfill-only Closes #245. Now .ready is just a signal that the stream has transitioned from "waiting" to any other state. --- Examples.md | 4 +- index.bs | 23 ++-- .../lib/readable-stream.js | 2 +- .../lib/writable-stream.js | 16 +-- reference-implementation/test/pipe-to.js | 22 ++-- .../test/writable-stream-abort.js | 22 ++-- .../test/writable-stream.js | 110 +++++++++++------- 7 files changed, 101 insertions(+), 98 deletions(-) diff --git a/Examples.md b/Examples.md index 2b39f2750..6687739db 100644 --- a/Examples.md +++ b/Examples.md @@ -294,9 +294,7 @@ function promptAndWrite(myStream) { }); } else if (writableStream.state === "waiting") { console.log("Waiting for the stream to flush to the underlying sink, please hold..."); - writableStream.ready - .then(promptAndWrite) - .catch(e => console.error("While flushing, an error occurred: ", e)); + writableStream.ready.then(promptAndWrite); } else if (writableStream.state === "errored") { console.error("Error writing; this session is over!"); } diff --git a/index.bs b/index.bs index 637066f1e..2debb2763 100644 --- a/index.bs +++ b/index.bs @@ -1027,8 +1027,7 @@ Instances of WritableStream are created with the internal slots des \[[readyPromise]] - A promise that becomes fulfilled when the stream becomes "writable", and is replaced with a new - pending promise when the stream becomes "waiting"; returned by the ready getter + A promise returned by the ready getter \[[writing]] @@ -1122,11 +1121,12 @@ Instances of WritableStream are created with the internal slots des
get ready
- The ready getter returns a promise that will be fulfilled when the stream enters the - "writable" state, i.e., when the stream's internal queue is not full according to its queuing - strategy. (The promise will be rejected if the stream errors.) + The ready getter returns a promise that will be fulfilled when the stream transitions away from the + "waiting" state to any other state. Once the stream transitions back to "waiting", the + getter will return a new promise that stays pending until the next state transition. - In essence, this promise gives a signal as to when any backpressure has let up. + In essence, this promise gives a signal as to when any backpressure has let up (or that the stream has been closed + or errored).
    @@ -1194,10 +1194,7 @@ Instances of WritableStream are created with the internal slots des TypeError exception.
  1. If this@\[[state]] is "errored", return a promise rejected with this@\[[storedError]]. -
  2. If this@\[[state]] is "writable", set this@\[[readyPromise]] to a new promise - rejected with a TypeError exception. -
  3. If this@\[[state]] is "waiting", reject this@\[[readyPromise]] with a - TypeError exception. +
  4. If this@\[[state]] is "waiting", resolve this@\[[readyPromise]] with undefined.
  5. Set this@\[[state]] to "closing"
  6. Call-with-rethrow EnqueueValueWithSize(this@\[[queue]], "close", 0).
  7. Call-with-rethrow CallOrScheduleWritableStreamAdvanceQueue(this). @@ -1301,10 +1298,8 @@ a variable stream, that performs the following steps: e.
  • Set stream@\[[storedError]] to e. -
  • If stream@\[[state]] is "writable" or "closing", set - stream@\[[readyPromise]] to a new promise rejected with e. -
  • If stream@\[[state]] is "waiting", reject stream@\[[readyPromise]] with - e. +
  • If stream@\[[state]] is "waiting", resolve stream@\[[readyPromise]] with + undefined.
  • Reject stream@\[[closedPromise]] with e.
  • Set stream@\[[state]] to "errored". diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 2bb425303..504559b8c 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -119,7 +119,7 @@ export default class ReadableStream { if (source.state === 'readable') { Promise.race([source.closed, dest.ready]).then(doPipe, doPipe); } else if (source.state === 'waiting') { - Promise.race([source.ready, dest.ready]).then(doPipe, doPipe); + Promise.race([source.ready, dest.ready]).then(doPipe); } else if (source.state === 'errored') { source.closed.catch(abortDest); } else if (source.state === 'closed') { diff --git a/reference-implementation/lib/writable-stream.js b/reference-implementation/lib/writable-stream.js index c57349f05..b61b41d95 100644 --- a/reference-implementation/lib/writable-stream.js +++ b/reference-implementation/lib/writable-stream.js @@ -36,7 +36,6 @@ export default class WritableStream { this._readyPromise = Promise.resolve(undefined); this._readyPromise_resolve = null; - this._readyPromise_reject = null; this._queue = []; this._state = 'writable'; @@ -87,13 +86,8 @@ export default class WritableStream { if (this._state === 'errored') { return Promise.reject(this._storedError); } - if (this._state === 'writable') { - this._readyPromise = Promise.reject(new TypeError('stream has already been closed')); - this._readyPromise_resolve = null; - this._readyPromise_reject = null; - } if (this._state === 'waiting') { - this._readyPromise_reject(new TypeError('stream has already been closed')); + this._readyPromise_resolve(undefined); } this._state = 'closing'; @@ -203,13 +197,8 @@ function CreateWritableStreamErrorFunction(stream) { stream._storedError = e; - if (stream._state === 'writable' || stream._state === 'closing') { - stream._readyPromise = Promise.reject(e); - stream._readyPromise_resolve = null; - stream._readyPromise_reject = null; - } if (stream._state === 'waiting') { - stream._readyPromise_reject(e); + stream._readyPromise_resolve(undefined); } stream._closedPromise_reject(e); stream._state = 'errored'; @@ -231,7 +220,6 @@ function SyncWritableStreamStateWithQueue(stream) { stream._state = 'waiting'; stream._readyPromise = new Promise((resolve, reject) => { stream._readyPromise_resolve = resolve; - stream._readyPromise_reject = reject; }); } diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 8cf88915b..7436b0ae8 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -112,20 +112,18 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro ws.write('Hello'); t.assert(writeCalled, 'write must be called'); - ws.ready.then( - () => t.fail('ready promise unexpectedly fulfilled'), - () => { - t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); + ws.ready.then(() => { + t.equal(ws.state, 'errored', 'as a result of rejected promise, ws must be in errored state'); - rs.pipeTo(ws); + rs.pipeTo(ws); - // Need to delay because pipeTo retrieves error from dest using ready. - setTimeout(() => { - t.assert(cancelCalled); - t.equal(rs.state, 'closed'); - t.end(); - }, 0); - }); + // Need to delay because pipeTo retrieves error from dest using ready. + setTimeout(() => { + t.assert(cancelCalled); + t.equal(rs.state, 'closed'); + t.end(); + }, 0); + }); }, 0); }); diff --git a/reference-implementation/test/writable-stream-abort.js b/reference-implementation/test/writable-stream-abort.js index 4b6911663..8f4bf0bcc 100644 --- a/reference-implementation/test/writable-stream-abort.js +++ b/reference-implementation/test/writable-stream-abort.js @@ -98,7 +98,7 @@ test('Aborting a WritableStream passes through the given reason', t => { }); test('Aborting a WritableStream puts it in an errored state, with stored error equal to the abort reason', t => { - t.plan(6); + t.plan(5); var recordedReason; var ws = new WritableStream(); @@ -113,11 +113,6 @@ test('Aborting a WritableStream puts it in an errored state, with stored error e r => t.equal(r, passedReason, 'writing should reject with the given reason') ); - ws.ready.then( - () => t.fail('ready should not succeed'), - r => t.equal(r, passedReason, 'ready should reject with the given reason') - ); - ws.close().then( () => t.fail('closing should not succeed'), r => t.equal(r, passedReason, 'closing should reject with the given reason') @@ -134,18 +129,21 @@ test('Aborting a WritableStream puts it in an errored state, with stored error e ); }); -test('Aborting a WritableStream causes any outstanding ready promises to be rejected with the abort reason', t => { +test('Aborting a WritableStream causes any outstanding ready promises to be fulfilled immediately', t => { t.plan(2); var recordedReason; - var ws = new WritableStream({}); + var ws = new WritableStream({ + write(chunk) { + return new Promise(() => {}); // forever-pending, so normally .ready would not fulfill. + } + }); ws.write('a'); t.equal(ws.state, 'waiting', 'state should be waiting'); - ws.ready.then( - () => t.fail('ready should not succeed'), - r => t.equal(r, passedReason, 'ready should reject with the given reason') - ); + ws.ready.then(() => { + t.equal(ws.state, 'errored', 'state should now be errored'); + }); var passedReason = new Error('Sorry, it just wasn\'t meant to be.'); ws.abort(passedReason); diff --git a/reference-implementation/test/writable-stream.js b/reference-implementation/test/writable-stream.js index 3c5a43cb8..cfeac93e4 100644 --- a/reference-implementation/test/writable-stream.js +++ b/reference-implementation/test/writable-stream.js @@ -341,7 +341,7 @@ test('WritableStream if write returns a rejected promise, queued write and close }, 0); }); -test('If close is called on a WritableStream in writable state, ready will return a rejected promise', t => { +test('If close is called on a WritableStream in writable state, ready will return a fulfilled promise', t => { var ws = new WritableStream({ write() { t.fail('Unexpected write call'); @@ -350,7 +350,7 @@ test('If close is called on a WritableStream in writable state, ready will retur abort() { t.fail('Unexpected abort call'); t.end(); - }, + } }); // Wait for ws to start. @@ -360,23 +360,55 @@ test('If close is called on a WritableStream in writable state, ready will retur ws.close(); t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); - ws.ready.then( - () => t.fail('ready on ws returned a fulfilled promise unexpectedly'), - r => { - t.equal(r.constructor, TypeError, - 'ready must start returning a promise rejected with a TypeError exception'); - t.end(); - } - ); + ws.ready.then(v => { + t.equal(ws.state, 'closed', 'state must be closed by the time ready fulfills (because microtasks ordering)'); + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); }, 0); }); -test('If close is called on a WritableStream in waiting state, ready will return a rejected promise', t => { +test('If close is called on a WritableStream in waiting state, ready will return a fulfilled promise', t => { + var ws = new WritableStream({ + abort() { + t.fail('Unexpected abort call'); + t.end(); + } + }); + + // Wait for ws to start. + setTimeout(() => { + ws.write('a'); + t.equal(ws.state, 'waiting', 'state must become waiting synchronously on write call'); + + ws.close(); + t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); + + ws.ready.then(v => { + t.equal(ws.state, 'closing', 'state must still be closing when ready fulfills'); + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); + }, 0); +}); + +test('If close is called on a WritableStream in waiting state, ready will be fulfilled immediately even if close ' + + 'takes a long time', t => { + + var readyFulfilledAlready = false; var ws = new WritableStream({ abort() { t.fail('Unexpected abort call'); t.end(); }, + close() { + return new Promise(resolve => { + setTimeout(() => { + t.ok(readyFulfilledAlready, 'ready should have fulfilled already'); + resolve(); + }, 50); + }); + } }); // Wait for ws to start. @@ -387,18 +419,16 @@ test('If close is called on a WritableStream in waiting state, ready will return ws.close(); t.equal(ws.state, 'closing', 'state must become closing synchronously on close call'); - ws.ready.then( - () => t.fail('ready on ws returned a fulfilled promise unexpectedly'), - r => { - t.equal(r.constructor, TypeError, - 'ready must start returning a promise rejected with a TypeError exception'); - t.end(); - } - ); + ws.ready.then(v => { + readyFulfilledAlready = true; + t.equal(ws.state, 'closing', 'state must still be closing when ready fulfills'); + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); }, 0); }); -test('If sink rejects on a WritableStream in writable state, ready will return a rejected promise', t => { +test('If sink rejects on a WritableStream in writable state, ready will return a fulfilled promise', t => { t.plan(5); var rejectWritePromise; @@ -422,10 +452,10 @@ test('If sink rejects on a WritableStream in writable state, ready will return a t.equal(r, passedError, 'write() should be rejected with the passed error'); t.equal(ws.state, 'errored', 'state is errored as error is called'); - ws.ready.then( - () => t.fail('ready on ws returned a fulfilled promise unexpectedly'), - r => t.equal(r, passedError, 'ready should be rejected with the passed error') - ); + ws.ready.then(v => { + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); } ); }, 0); @@ -459,14 +489,12 @@ test('WritableStream if sink\'s close throws', t => { }, r => { t.equal(ws.state, 'errored', 'state must be errored as error is called'); + t.equal(r, passedError, 'close() should be rejected with the passed error'); - ws.ready.then( - () => t.fail('ready on ws returned a fulfilled promise unexpectedly'), - r => { - t.equal(r, passedError, 'ready should be rejected with the passed error'); - t.end(); - } - ); + ws.ready.then(v => { + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); } ); }, 0); @@ -500,14 +528,12 @@ test('WritableStream if the promise returned by sink\'s close rejects', t => { }, r => { t.equal(ws.state, 'errored', 'state must be errored as error is called'); + t.equal(r, passedError, 'close() should be rejected with the passed error'); - ws.ready.then( - () => t.fail('ws.ready returned a fulfilled promise'), - r => { - t.equal(r, passedError, 'ready should be rejected with the passed error'); - t.end(); - } - ); + ws.ready.then(v => { + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); } ); }, 0); @@ -539,10 +565,10 @@ test('If sink rejects on a WritableStream in waiting state, ready will return a t.equal(r, passedError, 'write() should be rejected with the passed error'); t.equal(ws.state, 'errored', 'state is errored as error is called'); - ws.ready.then( - () => t.fail('ready on ws returned a fulfilled promise unexpectedly'), - r => t.equal(r, passedError, 'ready should be rejected with the passed error') - ); + ws.ready.then(v => { + t.equal(v, undefined, 'ready promise was fulfilled with undefined'); + t.end(); + }); } ); }, 0);