From 63372f5a757f8a7687703aa46621654d1c4e8498 Mon Sep 17 00:00:00 2001 From: Domenic Denicola Date: Tue, 25 Nov 2014 15:35:03 -0500 Subject: [PATCH] Add ability to exclusively lock a readable stream Closes #241. Adds the new getReader() method to readable streams, which returns an ExclusiveStreamReader instance that can be used to read from and observe the stream while denying anyone else the ability to read from or observe it. The implementation of pipeTo makes use of this, and other constructs like the the consume-body algorithm of Fetch are anticipated to make use of this (see e.g. https://github.com/yutakahirano/fetch-with-streams/). --- Locking Design Doc.md | 108 +++++ index.bs | 255 +++++++++++- .../lib/exclusive-stream-reader.js | 87 ++++ .../lib/experimental/readable-byte-stream.js | 30 +- .../lib/readable-stream.js | 68 +++- .../test/exclusive-stream-reader.js | 376 ++++++++++++++++++ reference-implementation/test/pipe-to.js | 71 ++-- 7 files changed, 922 insertions(+), 73 deletions(-) create mode 100644 Locking Design Doc.md create mode 100644 reference-implementation/lib/exclusive-stream-reader.js create mode 100644 reference-implementation/test/exclusive-stream-reader.js diff --git a/Locking Design Doc.md b/Locking Design Doc.md new file mode 100644 index 000000000..7159754ea --- /dev/null +++ b/Locking Design Doc.md @@ -0,0 +1,108 @@ +# Locking a Stream for Exclusive Reading + +In [#241](https://github.com/whatwg/streams/issues/241) we had a great conversation about the need for being able to "lock" a stream for exclusive use. This would be done implicitly while piping, but could also be useful for building user-facing abstractions, as we'll see below. + +What emerged was the idea of a "stream reader," which has most of the readable stream interface, but while it exists you cannot read from the stream except through that reader. + +This document represents some formative rationales for the design of the reader concept, approached from the perspective of a developer that uses increasingly complex features of the streams ecosystem. + +## Developer usage + +### Level 0: no reader usage + +If the developer knows nothing about readers, they can continue using the stream just fine. + +- `read()`, `state`, and `ready` all behave as they do now if used without `pipeTo`. +- `pipeTo` will cause the following side effects: + - `read()` will throw an informative error + - `state` will return `"waiting"` until the pipe completes (successfully or otherwise) + - `ready` will return a promise that remains pending until the pipe completes + +### Level 1: using readers directly + +The developer might want to create their own abstractions that require exclusive access to the stream. For example, a read-to-end function would probably want to avoid others being able to call `.read()` in the middle. + +Example code: + +```js +function readAsJson(rs) { + var string = ""; + var reader = rs.getReader(); + + pump(); + + // These lines would be simpler with `Promise.prototype.finally` (or async functions). + return reader.closed.then( + () => { + reader.releaseLock(); + return JSON.parse(string); + }, + e => { + reader.releaseLock(); + throw e; + } + ); + + function pump() { + while (reader.state === "readable") { + string += reader.read(); + } + if (reader.state === "waiting") { + reader.ready.then(pump); + } + } +} +``` + +The stream would have the same behaviors after being passed to `readAsJson` that it would have after calling its `pipeTo` method. + +The reader should have all of the non-piping-related public interface of the stream. This includes: + +- `closed` getter, which is a pass-through +- `state` and `ready` getters, which reveal the "true" state and state transitions of the stream which the stream itself no longer reveals +- `read()` method, which has the same behavior as that of the stream's except that it works while the stream is locked +- `cancel()` method, which first calls `this.releaseLock()` before the pass-through + +While a stream is locked, it is indistinguishable from a stream that has been drained of all chunks and is not getting any more enqueued. We could consider adding some kind of test, like `stream.isLocked`, to distinguish. However, it's not clear there's a compelling reason for doing so (let us know if so?), and the indistinguishability is kind of a nice property from the perspective of the principle of least authority. + +For readers, you should be able to tell if they're still active (i.e. have not been released) via `reader.isActive`. + +### Level 2: subclassers of `ReadableStream` + +Subclasses of `ReadableStream` should get locking support "for free." The same mechanisms for acquiring and using a lock should work flawlessly. More interestingly, if they wanted to support modifying the behavior of e.g. `read()` (or `state` or `ready` or `closed`), they should only have to override it in one location. + +Which location is more friendly? Probably in `ReadableStream`, so that `ExclusiveStreamReader` still works for `ReadableStream` subclasses. Less work. + +This means `ExclusiveStreamReader` should delegate to `ReadableStream`, and not the other way around. + +### Level 3: custom readable stream implementations? + +It is unclear whether this is necessary, but up until now we have a high level of support for anyone who wants to re-implement the entire `ReadableStream` interface with their own specific code. For example, if you implement `state`, `ready`, `closed`, `read()`, and `cancel()`, you can do `myCustomStream.pipeTo = ReadableStream.prototype.pipeTo` and it will continue to work. + +If we encourage this kind of thing, we should make it easy for custom readable streams to be lockable as well. That basically means `ExclusiveStreamReader` should not require knowledge of `ReadableStream`'s internal slots. + +We can work around this if necessary by passing `ExclusiveStreamReader` any capabilities it needs to manipulate `ReadableStream`'s internal state; then people reimplementing the readable stream interface can do e.g. `new ExclusiveStreamReader(this, { getLock, setLock })` or similar. + +## Optimizability + +The need to support subclassing, via `ExclusiveStreamReader` delegating to the `ReadableStream` implementation, conflicts a bit with the desire for readers to be fast. However, this can be fixed with some cleverness. + +The spec semantics for e.g. `reader.read()` are essentially: + +- Check that `reader@[[stream]]` is locked to `reader`. +- Unlock `reader@[[stream]]`. +- Try `return reader@[[stream]].read()`; finally re-lock `reader@[[stream]]`. + +This will ensure that if `reader@[[stream]]` is a subclass of `ReadableStream`, it will polymorphically dispatch to the subclass's `read` method. However, this kind of try/finally pattern is not very optimizable in V8. + +Here is an optimization that can be performed instead, with slight tweaks to both `ReadableStream.prototype.read` and `ExclusiveStreamReader.prototype.read`: + +- Define `ReadableStream.prototype.read` as: + - Check that `this` is not locked. + - Return `ReadFromReadableStream(this)`. (That is, extract the main functionality, without the check, into its own function.) +- Define `ExclusiveStreamReader.prototype.read` like so: + - Check that `this@[[stream]]` is locked to `this`. + - If `this@[[stream]].read` is equal to the original `ReadableStream.prototype.read`: return `ReadFromReadableStream(this@[[stream]])`. + - Otherwise, proceed via the per-spec semantics above. + +This essentially ensures that all undisturbed readable streams, or readable stream subclasses that do not override `read`, go down the "fast path" by ignoring all the try/finally and lock/unlock business. It is unobservable, since we have checked that `read` has not been modified in any way. diff --git a/index.bs b/index.bs index 8450eae3e..5677ad5b3 100644 --- a/index.bs +++ b/index.bs @@ -148,6 +148,24 @@ based the total size of all chunks in the stream's internal queue. backpressure signal. +

Locking

+ + + +An exclusive stream reader or simply reader is an object that encapsulates a readable stream, +preventing access to the stream except through the reader's interface. We say in this case the stream is +locked to the reader, and that the reader is +active. A readable stream can have at most one reader at a time. + +The reader presents most of the stream's interface, but while it is active, only the reader's methods and properties +can be used to successfully manipulate and interrogate the state of the stream; when the stream is used directly, it +appears as if it is empty. + +A reader also has the capability to release its read lock, which makes it no +longer active. At this point the original stream can be used as before, and the reader becomes inert. If the +encapsulated stream becomes closed or errored as a result of the behavior of its underlying source, any +associated reader will automatically release its lock. +

Readable Streams

Introduction to Readable Streams

@@ -376,6 +394,7 @@ would look like get state() cancel(reason) + getReader() pipeThrough({ writable, readable }, options) pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) read() @@ -433,6 +452,10 @@ Instances of ReadableStream are created with the internal slots des \[[queue]] A List representing the stream's internal queue of chunks + + \[[reader]] + A ExclusiveStreamReader instance, if the stream is locked to an exclusive reader, or + undefined if it is not \[[started]] A boolean flag indicating whether the underlying source has finished starting @@ -493,6 +516,7 @@ Instances of ReadableStream are created with the internal slots des
  • Set this@\[[queue]] to a new empty List.
  • Set this@\[[state]] to "waiting".
  • Set this@\[[started]], this@\[[draining]], and this@\[[pulling]] to false. +
  • Set this@\[[reader]] to undefined.
  • Set this@\[[enqueue]] to CreateReadableStreamEnqueueFunction(this).
  • Set this@\[[close]] to CreateReadableStreamCloseFunction(this).
  • Set this@\[[error]] to CreateReadableStreamErrorFunction(this). @@ -532,6 +556,7 @@ Instances of ReadableStream are created with the internal slots des
      +
    1. If this@\[[reader]] is not undefined, return this@\[[reader]]@\[[lockReleased]].
    2. Return this@\[[readyPromise]].
    @@ -553,9 +578,12 @@ Instances of ReadableStream are created with the internal slots des
    "errored"
    An error occurred interacting with the underlying source, and so the stream is now dead. + + If the stream is locked to a reader, the stream will appear to be "waiting".
      +
    1. If this@\[[reader]] is not undefined, return "waiting".
    2. Return this@\[[state]].
    @@ -565,19 +593,44 @@ Instances of ReadableStream are created with the internal slots des The cancel method signals a loss of interest in the stream by a consumer. Calling it will immediately move the stream to a "closed" state, throwing away any queued data, as well as executing any cancellation mechanism of the underlying source. + + Readable streams cannot be cancelled while locked to a reader; this method will return a rejected promise.
      +
    1. If this@\[[reader]] is not undefined, return a new promise rejected with a TypeError.
    2. If this@\[[state]] is "closed", return a new promise resolved with undefined.
    3. If this@\[[state]] is "errored", return a new promise rejected with this@\[[storedError]].
    4. If this@\[[state]] is "waiting", resolve this@\[[readyPromise]] with undefined.
    5. Let this@\[[queue]] be a new empty List. -
    6. Set this@\[[state]] to "closed". -
    7. Resolve this@\[[closedPromise]] with undefined. +
    8. Call-with-rethrow CloseReadableStream(this).
    9. Let sourceCancelPromise be the result of promise-calling this@\[[onCancel]](reason).
    10. Return the result of transforming sourceCancelPromise by a fulfillment handler that returns undefined.
    +
    getReader()
    + +
    + The getReader method creates an exclusive stream reader and + locks the stream to the the new reader. While the stream is locked, it cannot be + manipulated directly, and will appear to be an inert, empty stream waiting for new chunks to be enqueued. + Instead, the returned reader object can be used to read from or cancel the stream, or to discern its state and state + transitions. If or when the lock is released, the stream can be used again as + normal. + + This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its + entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours, interfering + with your abstraction or observing its side-effects. + + Note that when a stream is closed or errors, any reader it is locked to is automatically released. +
    + +
      +
    1. If this@\[[state]] is "closed", throw a TypeError exception. +
    2. If this@\[[state]] is "errored", throw this@\[[storedError]]. +
    3. Return Construct(ExclusiveStreamReader, (this)). +
    +
    pipeThrough({ writable, readable }, options)
    @@ -621,6 +674,7 @@ look for the pipeTo method.
      +
    1. If this@\[[reader]] is not undefined, throw a TypeError exception.
    2. If this@\[[state]] is "waiting" or "closed", throw a TypeError exception.
    3. If this@\[[state]] is "errored", throw this@\[[storedError]].
    4. Assert: this@\[[state]] is "readable". @@ -628,11 +682,7 @@ look for the pipeTo method.
    5. Let chunk be DequeueValue(this@\[[queue]]).
    6. If this@\[[queue]] is now empty,
        -
      1. If this@\[[draining]] is true, -
          -
        1. Set this@\[[state]] to "closed". -
        2. Resolve this@\[[closedPromise]] with undefined. -
        +
      2. If this@\[[draining]] is true, call-with-rethrow CloseReadableStream(this).
      3. If this@\[[draining]] is false,
        1. Set this@\[[state]] to "waiting". @@ -643,6 +693,172 @@ look for the pipeTo method.
        2. Return chunk.
        +

        Class ExclusiveStreamReader

        + +

        Class Definition

        + +This section is non-normative. + +If one were to write the ExclusiveStreamReader class in something close to the syntax of [[!ECMASCRIPT]], +it would look like + +
        
        +  class ExclusiveStreamReader {
        +    constructor(stream)
        +
        +    get closed()
        +    get isActive()
        +    get ready()
        +    get state()
        +
        +    cancel(reason, ...args)
        +    read(...args)
        +    releaseLock()
        +  }
        +
        + +

        Internal Slots

        + +Instances of ExclusiveStreamReader are created with the internal slots described in the following table: + + + + + + + + + + + + +
        Internal SlotDescription (non-normative)
        \[[stream]] + A ReadableStream instance that this reader encapsulates +
        \[[lockReleased]] + A promise that becomes fulfilled when the reader releases its lock on the stream +
        + +

        new ExclusiveStreamReader(stream)

        + +
          +
        1. If stream does not have a \[[reader]] internal slot, throw a TypeError exception. +
        2. If stream@\[[reader]] is not undefined, throw a TypeError exception. +
        3. Set stream@\[[reader]] to this. +
        4. Set this@\[[stream]] to stream. +
        5. Set this@\[[lockReleased]] to a new promise. +
        + +

        Properties of the ExclusiveStreamReader Prototype

        + +
        get closed
        + +
        + The closed getter for a stream reader simply delegates to the encapsulated stream, to allow consumers to + use the reader interface as they would the readable stream interface. +
        + +
          +
        1. Return Get(this@\[[stream]], "closed"). +
        + +
        get isActive
        + +
        + The isActive getter returns whether or not the stream reader is currently + active. +
        + +
          +
        1. Return SameValue(this@\[[stream]]@\[[reader]], this). +
        + +
        get ready
        + +
        + The ready getter behaves the same as that for the readable stream encapsulated by this reader, except + that while the reader has exclusive access to the stream, the promise returned will reveal the stream's true state + transitions. (In contrast, the stream itself does not signal any state transitions while + locked, giving off the appearance of being "waiting" for the + duration.) +
        + +
          +
        1. If SameValue(this@\[[stream]]@\[[reader]], this) is false, return + Get(this@\[[stream]], "ready"). +
        2. Set this@\[[stream]]@\[[reader]] to undefined. +
        3. Let readyResult be Get(this@\[[stream]], "ready"). +
        4. Set this@\[[stream]]@\[[reader]] to this. +
        5. Return readyResult. +
        + +
        get state
        + +
        + The state getter behaves the same as that for the readable stream encapsulated by this reader, except + that while the reader has exclusive access to the stream, it will reveal the stream's true state. (In contrast, the + stream itself gives off the appearance of being "waiting" while it is + locked.) +
        + +
          +
        1. If SameValue(this@\[[stream]]@\[[reader]], this) is false, return + Get(this@\[[stream]], "state"). +
        2. Set this@\[[stream]]@\[[reader]] to undefined. +
        3. Let stateResult be Get(this@\[[stream]], "state"). +
        4. Set this@\[[stream]]@\[[reader]] to this. +
        5. Return stateResult. +
        + +
        cancel(...args)
        + +
        + If the reader is active, the cancel method will + release the lock and then call through to the stream's own cancel + method. +
        + +
          +
        1. If SameValue(this@\[[stream]]@\[[reader]], this) is false, return a new promise rejected with a + TypeError. +
        2. Call-with-rethrow Invoke(this, "releaseLock"). +
        3. Return Invoke(this@\[[stream]], "cancel", args). +
        + +The length property of the cancel method is 1. + +
        read(...args)
        + +
        + If the reader is active, the read method behaves the same as that for the + encapsulated stream, except that the reader will be able to use its exclusive access to the stream to retrieve + chunks. (In contrast, the stream itself will not allow any chunks to be read from it while it is + locked.) +
        + +
          +
        1. If SameValue(this@\[[stream]]@\[[reader]], this) is false, throw a TypeError + exception. +
        2. Set this@\[[stream]]@\[[reader]] to undefined. +
        3. Let readResult be Invoke(this@\[[stream]], "read", args). +
        4. Set this@\[[stream]]@\[[reader]] to this. +
        5. Return readResult. +
        + +
        releaseLock()
        + +
        + The releaseLock method releases the reader's lock on the encapsulated + stream. After the lock is released, the reader is no longer active, and its + cancel and read methods will fail, while its closed, ready, and + state getters will simply delegate to the encapsulated stream. +
        + +
          +
        1. If SameValue(this@\[[stream]]@\[[reader]], this) is false, return undefined. +
        2. Set this@\[[stream]]@\[[reader]] to undefined. +
        3. Resolve this@\[[lockReleased]] with undefined. +
        +

        Readable Stream Abstract Operations

        CallReadableStreamPull ( stream )

        @@ -664,6 +880,15 @@ look for the pipeTo method.
      4. Otherwise, return undefined.
      +

      CloseReadableStream ( stream )

      + +
        +
      1. Set stream@\[[state]] to "closed". +
      2. Resolve stream@\[[closedPromise]] with undefined. +
      3. If stream@\[[reader]] is not undefined, call-with-rethrow Invoke(stream@\[[reader]], "releaseLock"). +
      4. Return undefined. +
      +

      CreateReadableStreamCloseFunction ( stream )

        @@ -677,8 +902,7 @@ A Readable Stream Close Function is a built-in anonymous function of
      1. If stream@\[[state]] is "waiting",
        1. Resolve stream@\[[readyPromise]] with undefined. -
        2. Resolve stream@\[[closedPromise]] with undefined. -
        3. Set stream@\[[state]] to "closed". +
        4. Return CloseReadableStream(this).
      2. If stream@\[[state]] is "readable",
          @@ -728,19 +952,14 @@ A Readable Stream Error Function is a built-in anonymous function of a variable stream, that performs the following steps:
            -
          1. If stream@\[[state]] is "waiting", -
              -
            1. Set stream@\[[state]] to "errored". -
            2. Set stream@\[[storedError]] to e. -
            3. Resolve stream@\[[readyPromise]] with undefined. -
            4. Reject stream@\[[closedPromise]] with e. -
            -
          2. If stream@\[[state]] is "readable", +
          3. If stream@\[[state]] is "waiting", resolve stream@\[[readyPromise]] with undefined. +
          4. If stream@\[[state]] is "readable", let stream@\[[queue]] be a new empty List. +
          5. If stream@\[[state]] is either "waiting" or "readable",
              -
            1. Let stream@\[[queue]] be a new empty List.
            2. Set stream@\[[state]] to "errored".
            3. Set stream@\[[storedError]] to e.
            4. Reject stream@\[[closedPromise]] with e. +
            5. If stream@\[[reader]] is not undefined, call-with-rethrow Invoke(stream@\[[reader]], "releaseLock").
          diff --git a/reference-implementation/lib/exclusive-stream-reader.js b/reference-implementation/lib/exclusive-stream-reader.js new file mode 100644 index 000000000..04371f9d5 --- /dev/null +++ b/reference-implementation/lib/exclusive-stream-reader.js @@ -0,0 +1,87 @@ +var assert = require('assert'); + +export default class ExclusiveStreamReader { + constructor(stream) { + if (!('_reader' in stream)) { + throw new TypeError('ExclusiveStreamReader can only be used with ReadableStream objects or subclasses'); + } + + if (stream._reader !== undefined) { + throw new TypeError('This stream has already been locked for exclusive reading by another reader'); + } + + stream._reader = this; + + this._stream = stream; + + this._lockReleased = new Promise(resolve => { + this._lockReleased_resolve = resolve; + }); + } + + get ready() { + if (this._stream._reader !== this) { + return this._stream.ready; + } + + this._stream._reader = undefined; + try { + return this._stream.ready; + } finally { + this._stream._reader = this; + } + } + + get state() { + if (this._stream._reader !== this) { + return this._stream.state; + } + + this._stream._reader = undefined; + try { + return this._stream.state; + } finally { + this._stream._reader = this; + } + } + + get closed() { + return this._stream.closed; + } + + get isActive() { + return this._stream._reader === this; + } + + read(...args) { + if (this._stream._reader !== this) { + throw new TypeError('This stream reader has released its lock on the stream and can no longer be used'); + } + + this._stream._reader = undefined; + try { + return this._stream.read(...args); + } finally { + this._stream._reader = this; + } + } + + cancel(reason, ...args) { + if (this._stream._reader !== this) { + return Promise.reject( + new TypeError('This stream reader has released its lock on the stream and can no longer be used')); + } + + this.releaseLock(); + return this._stream.cancel(reason, ...args); + } + + releaseLock() { + if (this._stream._reader !== this) { + return undefined; + } + + this._stream._reader = undefined; + this._lockReleased_resolve(undefined); + } +} diff --git a/reference-implementation/lib/experimental/readable-byte-stream.js b/reference-implementation/lib/experimental/readable-byte-stream.js index 7696181b1..0e4ac4acf 100644 --- a/reference-implementation/lib/experimental/readable-byte-stream.js +++ b/reference-implementation/lib/experimental/readable-byte-stream.js @@ -57,6 +57,7 @@ export default class ReadableByteStream { } } + this._reader = undefined; this._state = 'waiting'; this._onReadInto = readInto; @@ -80,6 +81,10 @@ export default class ReadableByteStream { } get state() { + if (this._reader !== undefined) { + return 'waiting'; + } + return this._state; } @@ -169,14 +174,11 @@ export default class ReadableByteStream { return resizedArrayBuffer; } - // Note: We plan to make this more efficient in the future. But for now this - // implementation suffices to show interoperability with a generic - // WritableStream. - pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { - ReadableStream.prototype.pipeTo.call(this, dest, {preventClose, preventAbort, preventCancel}); - } - get ready() { + if (this._reader !== undefined) { + return this._reader._lockReleased; + } + return this._readyPromise; } @@ -192,6 +194,7 @@ export default class ReadableByteStream { } this._state = 'closed'; + this._reader = undefined; this._resolveClosedPromise(undefined); return new Promise((resolve, reject) => { @@ -208,6 +211,10 @@ export default class ReadableByteStream { } get closed() { + if (this._reader !== undefined) { + return this._reader._lockReleased.then(() => this._closedPromise); + } + return this._closedPromise; } @@ -223,3 +230,12 @@ export default class ReadableByteStream { this._closedPromise_reject = null; } } + +// Note: We plan to make this more efficient in the future. But for now this +// implementation suffices to show interoperability with a generic +// WritableStream. +ReadableByteStream.prototype.pipeTo = ReadableStream.prototype.pipeTo; + +// These can be direct copies. Per spec though they probably should not be === since that might preclude optimizations. +ReadableByteStream.prototype.pipeThrough = ReadableStream.prototype.pipeThrough; +ReadableByteStream.prototype.getReader = ReadableStream.prototype.getReader; diff --git a/reference-implementation/lib/readable-stream.js b/reference-implementation/lib/readable-stream.js index 504559b8c..9657f385a 100644 --- a/reference-implementation/lib/readable-stream.js +++ b/reference-implementation/lib/readable-stream.js @@ -2,6 +2,7 @@ var assert = require('assert'); import * as helpers from './helpers'; import { DequeueValue, EnqueueValueWithSize, GetTotalQueueSize } from './queue-with-sizes'; import CountQueuingStrategy from './count-queuing-strategy'; +import ExclusiveStreamReader from './exclusive-stream-reader'; export default class ReadableStream { constructor({ @@ -30,6 +31,7 @@ export default class ReadableStream { this._started = false; this._draining = false; this._pulling = false; + this._reader = undefined; this._enqueue = CreateReadableStreamEnqueueFunction(this); this._close = CreateReadableStreamCloseFunction(this); @@ -50,10 +52,19 @@ export default class ReadableStream { } get state() { + if (this._reader !== undefined) { + return 'waiting'; + } + return this._state; } cancel(reason) { + if (this._reader !== undefined) { + return Promise.reject( + new TypeError('This stream is locked to a single exclusive reader and cannot be cancelled directly')); + } + if (this._state === 'closed') { return Promise.resolve(undefined); } @@ -65,13 +76,23 @@ export default class ReadableStream { } this._queue = []; - this._state = 'closed'; - this._resolveClosedPromise(undefined); + CloseReadableStream(this); var sourceCancelPromise = helpers.promiseCall(this._onCancel, reason); return sourceCancelPromise.then(() => undefined); } + getReader() { + if (this._state === 'closed') { + throw new TypeError('The stream has already been closed, so a reader cannot be acquired.'); + } + if (this._state === 'errored') { + throw this._storedError; + } + + return new ExclusiveStreamReader(this); + } + pipeThrough({ writable, readable }, options) { if (!helpers.typeIsObject(writable)) { throw new TypeError('A transform stream must have an writable property that is an object.'); @@ -86,11 +107,11 @@ export default class ReadableStream { } pipeTo(dest, { preventClose, preventAbort, preventCancel } = {}) { - var source = this; preventClose = Boolean(preventClose); preventAbort = Boolean(preventAbort); preventCancel = Boolean(preventCancel); + var source; var resolvePipeToPromise; var rejectPipeToPromise; @@ -98,6 +119,7 @@ export default class ReadableStream { resolvePipeToPromise = resolve; rejectPipeToPromise = reject; + source = this.getReader(); doPipe(); }); @@ -137,12 +159,16 @@ export default class ReadableStream { function cancelSource(reason) { if (preventCancel === false) { + // implicitly releases the lock source.cancel(reason); + } else { + source.releaseLock(); } rejectPipeToPromise(reason); } function closeDest() { + source.releaseLock(); if (preventClose === false) { dest.close().then(resolvePipeToPromise, rejectPipeToPromise); } else { @@ -151,6 +177,7 @@ export default class ReadableStream { } function abortDest(reason) { + source.releaseLock(); if (preventAbort === false) { dest.abort(reason); } @@ -159,6 +186,10 @@ export default class ReadableStream { } read() { + if (this._reader !== undefined) { + throw new TypeError('This stream is locked to a single exclusive reader and cannot be read from directly'); + } + if (this._state === 'waiting') { throw new TypeError('no chunks available (yet)'); } @@ -176,8 +207,7 @@ export default class ReadableStream { if (this._queue.length === 0) { if (this._draining === true) { - this._state = 'closed'; - this._resolveClosedPromise(undefined); + CloseReadableStream(this); } else { this._state = 'waiting'; this._initReadyPromise(); @@ -190,6 +220,10 @@ export default class ReadableStream { } get ready() { + if (this._reader !== undefined) { + return this._reader._lockReleased; + } + return this._readyPromise; } @@ -261,8 +295,7 @@ function CreateReadableStreamCloseFunction(stream) { return () => { if (stream._state === 'waiting') { stream._resolveReadyPromise(undefined); - stream._resolveClosedPromise(undefined); - stream._state = 'closed'; + return CloseReadableStream(stream); } if (stream._state === 'readable') { stream._draining = true; @@ -312,16 +345,18 @@ function CreateReadableStreamEnqueueFunction(stream) { function CreateReadableStreamErrorFunction(stream) { return e => { if (stream._state === 'waiting') { - stream._state = 'errored'; - stream._storedError = e; stream._resolveReadyPromise(undefined); - stream._rejectClosedPromise(e); } - else if (stream._state === 'readable') { + if (stream._state === 'readable') { stream._queue = []; + } + if (stream._state === 'waiting' || stream._state === 'readable') { stream._state = 'errored'; stream._storedError = e; stream._rejectClosedPromise(e); + if (stream._reader !== undefined) { + stream._reader.releaseLock(); + } } }; } @@ -339,6 +374,17 @@ function ShouldReadableStreamApplyBackpressure(stream) { return shouldApplyBackpressure; } +function CloseReadableStream(stream) { + stream._state = 'closed'; + stream._resolveClosedPromise(undefined); + + if (stream._reader !== undefined) { + stream._reader.releaseLock(); + } + + return undefined; +} + var defaultReadableStreamStrategy = { shouldApplyBackpressure(queueSize) { assert(typeof queueSize === 'number' && !Number.isNaN(queueSize)); diff --git a/reference-implementation/test/exclusive-stream-reader.js b/reference-implementation/test/exclusive-stream-reader.js new file mode 100644 index 000000000..02539c787 --- /dev/null +++ b/reference-implementation/test/exclusive-stream-reader.js @@ -0,0 +1,376 @@ +var test = require('tape'); + +import ReadableStream from '../lib/readable-stream'; + +test('Using the reader directly on a mundane stream', t => { + t.plan(22); + + var rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + setTimeout(() => enqueue('b'), 30); + setTimeout(close, 60); + } + }); + + t.equal(rs.state, 'readable', 'stream starts out readable'); + + var reader = rs.getReader(); + + t.equal(reader.isActive, true, 'reader isActive is true'); + + t.equal(rs.state, 'waiting', 'after getting a reader, the stream state is waiting'); + t.equal(reader.state, 'readable', 'the reader state is readable'); + + t.throws(() => rs.read(), /TypeError/, 'trying to read from the stream directly throws a TypeError'); + t.equal(reader.read(), 'a', 'trying to read from the reader works and gives back the first enqueued value'); + t.equal(reader.state, 'waiting', 'the reader state is now waiting since the queue has been drained'); + rs.cancel().then( + () => t.fail('cancel() should not be fulfilled'), + e => t.equal(e.constructor, TypeError, 'cancel() should be rejected with a TypeError') + ); + + reader.ready.then(() => { + t.equal(reader.state, 'readable', 'ready for reader is fulfilled when second chunk is enqueued'); + t.equal(rs.state, 'waiting', 'the stream state is still waiting'); + t.equal(reader.read(), 'b', 'you can read the second chunk from the reader'); + }); + + reader.closed.then(() => { + t.pass('closed for the reader is fulfilled'); + t.equal(reader.state, 'closed', 'the reader state is closed'); + t.equal(rs.state, 'closed', 'the stream state is closed'); + t.equal(reader.isActive, false, 'the reader is no longer active'); + + t.doesNotThrow(() => reader.releaseLock(), 'trying to release the lock twice does nothing'); + }); + + rs.ready.then(() => { + t.equal(rs.state, 'closed', 'ready for stream is not fulfilled until the stream closes'); + t.equal(reader.isActive, false, 'the reader is no longer active after the stream has closed'); + }); + + rs.closed.then(() => { + t.pass('closed for the stream is fulfilled'); + t.equal(rs.state, 'closed', 'the stream state is closed'); + t.equal(reader.state, 'closed', 'the reader state is closed'); + t.equal(reader.isActive, false, 'the reader is no longer active'); + }); +}); + +test('Readers delegate to underlying stream implementations', t => { + t.plan(3 * 3 + 2 * 4); + + var rs = new ReadableStream(); + var reader = rs.getReader(); + + testGetter('ready'); + testGetter('state'); + testGetter('closed'); + testMethod('read'); + testMethod('cancel'); + + // Generates 4 assertions + function testGetter(propertyName) { + Object.defineProperty(rs, propertyName, { + get() { + t.pass('overriden ' + propertyName + ' called'); + t.equal(this, rs, propertyName + ' called with the correct this value'); + return propertyName + ' return value'; + } + }); + t.equal(reader[propertyName], propertyName + ' return value', + `reader's ${propertyName} returns the return value of the stream's ${propertyName}`); + } + + // Generates 5 assertions + function testMethod(methodName) { + var testArgs = ['arg1', 'arg2', 'arg3']; + rs[methodName] = function (...args) { + t.pass('overridden ' + methodName + ' called'); + t.deepEqual(args, testArgs, methodName + ' called with the correct arguments'); + t.equal(this, rs, methodName + ' called with the correct this value'); + return methodName + ' return value'; + } + t.equal(reader[methodName](...testArgs), methodName + ' return value', + `reader's ${methodName} returns the return value of the stream's ${methodName}`); + } +}); + +test('Reading from a reader for an empty stream throws but doesn\'t break anything', t => { + var enqueue; + var rs = new ReadableStream({ + start(e) { + enqueue = e; + } + }); + var reader = rs.getReader(); + + t.equal(reader.isActive, true, 'reader is active to start with'); + t.equal(reader.state, 'waiting', 'reader state is waiting to start with'); + t.throws(() => reader.read(), /TypeError/, 'calling reader.read() throws a TypeError'); + t.equal(reader.isActive, true, 'reader is still active'); + t.equal(reader.state, 'waiting', 'reader state is still waiting'); + + enqueue('a'); + + reader.ready.then(() => { + t.equal(reader.state, 'readable', 'after enqueuing the reader state is readable'); + t.equal(reader.read(), 'a', 'the enqueued chunk can be read back through the reader'); + t.end(); + }); +}); + +test('Trying to use a released reader should work for ready/state/closed but fail for read/cancel', t => { + t.plan(9); + + var rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + enqueue('b'); + setTimeout(close, 40); + } + }); + var reader = rs.getReader(); + reader.releaseLock(); + + t.equal(reader.isActive, false, 'isActive returns false'); + t.equal(reader.state, 'readable', 'reader.state returns readable'); + t.equal(rs.state, 'readable', 'rs.state returns readable'); + + t.throws(() => reader.read(), /TypeError/, 'trying to read gives a TypeError'); + reader.cancel().then( + () => t.fail('reader.cancel() should not be fulfilled'), + e => t.equal(e.constructor, TypeError, 'reader.cancel() should be rejected with a TypeError') + ); + + reader.ready.then(() => { + t.pass('reader.ready should be fulfilled'); + t.equal(rs.read(), 'a', 'reading from the stream should give back the first enqueued chunk'); + t.equal(rs.read(), 'b', 'reading from the stream should give back the second enqueued chunk'); + }); + reader.closed.then(() => t.pass('reader.closed should be fulfilled')); +}); + +test('cancel() on a reader implicitly releases the reader before calling through', t => { + t.plan(3); + + var passedReason = new Error('it wasn\'t the right time, sorry'); + var rs = new ReadableStream({ + cancel(reason) { + t.equal(reason, passedReason, 'the cancellation reason is passed through to the underlying source'); + } + }); + + var reader = rs.getReader(); + reader.cancel(passedReason).then( + () => t.pass('reader.cancel() should fulfill'), + e => t.fail('reader.cancel() should not reject') + ); + + t.equal(reader.isActive, false, 'canceling via the reader should release the reader\'s lock'); +}); + +test('cancel() on a reader calls this.releaseLock directly instead of cheating', t => { + t.plan(3); + + var rs = new ReadableStream(); + + var reader = rs.getReader(); + reader.releaseLock = function (...args) { + t.pass('releaseLock was called directly'); + t.equal(args.length, 0, 'no arguments were passed'); + t.equal(this, reader, 'the correct this value was passed'); + }; + + reader.cancel(); +}); + +test('getReader() on a closed stream should fail', t => { + var rs = new ReadableStream({ + start(enqueue, close) { + close(); + } + }); + + t.equal(rs.state, 'closed', 'the stream should be closed'); + t.throws(() => rs.getReader(), /TypeError/, 'getReader() threw a TypeError'); + t.end(); +}); + +test('getReader() on a cancelled stream should fail (since cancelling closes)', t => { + var rs = new ReadableStream(); + rs.cancel(new Error('fun time is over')); + + t.equal(rs.state, 'closed', 'the stream should be closed'); + t.throws(() => rs.getReader(), /TypeError/, 'getReader() threw a TypeError'); + t.end(); +}); + +test('getReader() on an errored stream should rethrow the error', t => { + var theError = new Error('don\'t say i didn\'t warn ya'); + var rs = new ReadableStream({ + start(enqueue, close, error) { + error(theError); + } + }); + + t.equal(rs.state, 'errored', 'the stream should be errored'); + t.throws(() => rs.getReader(), /don't say i didn't warn ya/, 'getReader() threw the error'); + t.end(); +}); + +test('closed should be fulfilled after reader releases its lock (both .closed accesses after acquiring)', t => { + t.plan(2); + + var doClose; + var rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + var reader = rs.getReader(); + doClose(); + + reader.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled'); + }); + + rs.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled'); + }); +}); + +test('closed should be fulfilled after reader releases its lock (stream .closed access before acquiring)', t => { + t.plan(2); + + var doClose; + var rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + rs.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when stream closed is fulfilled'); + }); + + var reader = rs.getReader(); + doClose(); + + reader.closed.then(() => { + t.equal(reader.isActive, false, 'reader is no longer active when reader closed is fulfilled'); + }); +}); + +test('closed should be fulfilled after reader releases its lock (multiple stream locks)', t => { + t.plan(6); + + var doClose; + var rs = new ReadableStream({ + start(enqueue, close) { + doClose = close; + } + }); + + var reader1 = rs.getReader(); + + rs.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when stream closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when stream closed is fulfilled'); + }); + + reader1.releaseLock(); + + var reader2 = rs.getReader(); + doClose(); + + reader1.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when reader1 closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when reader1 closed is fulfilled'); + }); + + reader2.closed.then(() => { + t.equal(reader1.isActive, false, 'reader1 is no longer active when reader2 closed is fulfilled'); + t.equal(reader2.isActive, false, 'reader2 is no longer active when reader2 closed is fulfilled'); + }); +}); + +test('Multiple readers can access the stream in sequence', t => { + var rs = new ReadableStream({ + start(enqueue, close) { + enqueue('a'); + enqueue('b'); + enqueue('c'); + enqueue('d'); + enqueue('e'); + close(); + } + }); + + t.equal(rs.read(), 'a', 'reading the first chunk directly from the stream works'); + + var reader1 = rs.getReader(); + t.equal(reader1.read(), 'b', 'reading the second chunk from reader1 works'); + reader1.releaseLock(); + + t.equal(rs.read(), 'c', 'reading the third chunk from the stream after releasing reader1 works'); + + var reader2 = rs.getReader(); + t.equal(reader2.read(), 'd', 'reading the fourth chunk from reader2 works'); + reader2.releaseLock(); + + t.equal(rs.read(), 'e', 'reading the fifth chunk from the stream after releasing reader2 works'); + + t.end(); +}); + +test('A stream that errors has that reflected in the reader and the stream', t => { + t.plan(9); + + var error; + var rs = new ReadableStream({ + start(enqueue, close, error_) { + error = error_; + } + }); + + var reader = rs.getReader(); + + var passedError = new Error('too exclusive'); + error(passedError); + + t.equal(reader.isActive, false, 'the reader should have lost its lock'); + t.throws(() => reader.read(), /TypeError/, + 'reader.read() should throw a TypeError since the reader no longer has a lock'); + t.equal(reader.state, 'errored', 'the reader\'s state should be errored'); + reader.ready.then(() => t.pass('reader.ready should fulfill')); + reader.closed.then( + () => t.fail('reader.closed should not be fulfilled'), + e => t.equal(e, passedError, 'reader.closed should be rejected with the stream error') + ); + + t.throws(() => rs.read(), /too exclusive/, 'rs.read() should throw the stream error'); + t.equal(rs.state, 'errored', 'the stream\'s state should be errored'); + rs.ready.then(() => t.pass('rs.ready should fulfill')); + rs.closed.then( + () => t.fail('rs.closed should not be fulfilled'), + e => t.equal(e, passedError, 'rs.closed should be rejected with the stream error') + ); +}); + +test('Cannot use an already-released reader to unlock a stream again', t => { + t.plan(2); + + var rs = new ReadableStream(); + + var reader1 = rs.getReader(); + reader1.releaseLock(); + + var reader2 = rs.getReader(); + t.equal(reader2.isActive, true, 'reader2 state is active before releasing reader1'); + + reader1.releaseLock(); + t.equal(reader2.isActive, true, 'reader2 state is still active after releasing reader1 again'); +}); diff --git a/reference-implementation/test/pipe-to.js b/reference-implementation/test/pipe-to.js index 7436b0ae8..7f81ced6c 100644 --- a/reference-implementation/test/pipe-to.js +++ b/reference-implementation/test/pipe-to.js @@ -128,34 +128,30 @@ test('Piping from a ReadableStream in readable state to a WritableStream in erro }); test('Piping from a ReadableStream in closed state to a WritableStream in writable state', t => { + t.plan(3); + var rs = new ReadableStream({ start(enqueue, close) { close(); }, pull() { t.fail('Unexpected pull call'); - t.end(); }, cancel(reason) { t.fail('Unexpected cancel call'); - t.end(); } }); t.equal(rs.state, 'closed'); - var closeCalled = false; var ws = new WritableStream({ write() { t.fail('Unexpected write call'); - t.end(); }, close() { - t.assert(!closeCalled); - closeCalled = true; + t.fail('Unexpected close call'); }, abort() { t.fail('Unexpected abort call'); - t.end(); } }); @@ -163,42 +159,39 @@ test('Piping from a ReadableStream in closed state to a WritableStream in writab setTimeout(() => { t.equal(ws.state, 'writable'); - rs.pipeTo(ws); - t.assert(closeCalled); - t.equal(ws.state, 'closing'); - t.end(); + rs.pipeTo(ws).then( + () => t.fail('pipeTo promise should not be fulfilled'), + e => t.equal(e.constructor, TypeError, 'pipeTo promise should be rejected with a TypeError') + ); }, 0); }); test('Piping from a ReadableStream in errored state to a WritableStream in writable state', t => { + t.plan(3); + + var theError = new Error('piping is too hard today'); var rs = new ReadableStream({ start(enqueue, close, error) { - error(); + error(theError); }, pull() { t.fail('Unexpected pull call'); - t.end(); }, cancel(reason) { t.fail('Unexpected cancel call'); - t.end(); } }); t.equal(rs.state, 'errored'); - var abortCalled = false; var ws = new WritableStream({ write() { t.fail('Unexpected write call'); - t.end(); }, close() { t.fail('Unexpected close call'); - t.end(); }, abort() { - t.assert(!abortCalled); - abortCalled = true; + t.fail('Unexpected abort call'); } }); @@ -206,14 +199,10 @@ test('Piping from a ReadableStream in errored state to a WritableStream in writa setTimeout(() => { t.equal(ws.state, 'writable'); - rs.pipeTo(ws); - - // Need to delay because pipeTo retrieves error from dest using ready. - setTimeout(() => { - t.assert(abortCalled); - t.equal(ws.state, 'errored'); - t.end(); - }, 0); + rs.pipeTo(ws).then( + () => t.fail('pipeTo promise should not be fulfilled'), + e => t.equal(e, theError, 'pipeTo promise should be rejected with the passed error') + ); }, 0); }); @@ -365,6 +354,8 @@ test('Piping from a ReadableStream in waiting state which becomes readable after test('Piping from a ReadableStream in waiting state which becomes errored after pipeTo call to a WritableStream in ' + 'writable state', t => { + t.plan(4); + var errorReadableStream; var rs = new ReadableStream({ start(enqueue, close, error) { @@ -392,7 +383,6 @@ test('Piping from a ReadableStream in waiting state which becomes errored after }, abort(reason) { t.equal(reason, passedError); - t.end(); } }); @@ -504,7 +494,7 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait t.equal(ws.state, 'waiting'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'transfer of data must not happen until ws becomes writable'); + t.equal(rs.state, 'waiting', 'readable stream must say it is waitable while piping (even with a nonempty queue)'); t.equal(ws.state, 'waiting'); resolveWritePromise(); @@ -562,8 +552,9 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait setTimeout(() => { t.equal(ws.state, 'waiting'); + t.equal(rs.state, 'readable', 'readable stream should be readable before piping starts'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'transfer of data must not happen until ws becomes writable'); + t.equal(rs.state, 'waiting', 'readable stream must say it is waitable while piping (even with a nonempty queue)'); t.equal(ws.state, 'waiting'); errorWritableStream(); @@ -573,6 +564,8 @@ test('Piping from a ReadableStream in readable state to a WritableStream in wait test('Piping from a ReadableStream in readable state which becomes errored after pipeTo call to a WritableStream in ' + 'waiting state', t => { + t.plan(10); + var errorReadableStream; var pullCount = 0; var rs = new ReadableStream({ @@ -602,10 +595,9 @@ test('Piping from a ReadableStream in readable state which becomes errored after }, close() { t.fail('Unexpected close call'); - t.end(); }, abort() { - t.end(); + t.pass('underlying source abort was called'); } }); ws.write('Hello'); @@ -615,8 +607,9 @@ test('Piping from a ReadableStream in readable state which becomes errored after t.equal(ws.state, 'waiting'); t.equal(pullCount, 1); + t.equal(rs.state, 'readable', 'readable stream should be readable before piping starts'); rs.pipeTo(ws); - t.equal(rs.state, 'readable', 'transfer of data must not happen until ws becomes writable'); + t.equal(rs.state, 'waiting', 'readable stream must say it is waitable while piping (even with a nonempty queue)'); t.equal(ws.state, 'waiting'); errorReadableStream(); @@ -738,6 +731,8 @@ test('Piping from a ReadableStream in waiting state to a WritableStream in waiti test('Piping from a ReadableStream in waiting state which becomes closed after pipeTo call to a WritableStream in ' + 'waiting state', t => { + t.plan(5); + var closeReadableStream; var pullCount = 0; var rs = new ReadableStream({ @@ -783,20 +778,22 @@ test('Piping from a ReadableStream in waiting state which becomes closed after p rs.pipeTo(ws); closeReadableStream(); + t.equal(rs.state, 'closed'); + // Check that nothing happens. setTimeout(() => { t.equal(ws.state, 'closing'); t.equal(pullCount, 1); - - t.end(); }, 100); }); }); test('Piping from a ReadableStream in waiting state which becomes errored after pipeTo call to a WritableStream in ' + 'waiting state', t => { + t.plan(6); + var errorReadableStream; var pullCount = 0; var rs = new ReadableStream({ @@ -833,7 +830,6 @@ test('Piping from a ReadableStream in waiting state which becomes errored after t.equal(reason, passedError); t.assert(writeCalled); t.equal(pullCount, 1); - t.end(); } }); ws.write('Hello'); @@ -845,6 +841,7 @@ test('Piping from a ReadableStream in waiting state which becomes errored after rs.pipeTo(ws); errorReadableStream(passedError); + t.equal(rs.state, 'errored'); }); }); @@ -1094,7 +1091,7 @@ test('Piping to a stream that errors on the last chunk does not pass through the setTimeout(() => { t.equal(cancelCalled, false, 'cancel must not be called'); - t.equal(ws.state, 'errored', 'the writable stream must still be in an errored state'); + t.equal(ws.state, 'errored'); t.end(); }, 20); });