From 89a0cbc3a325b93316b9a9b14b036fa39629d399 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 9 Oct 2023 11:52:25 +0900 Subject: [PATCH] feat(ext/web): cancel support for TransformStream --- ext/web/06_streams.js | 118 ++++++++++++++++++++++++++++++++----- ext/web/lib.deno_web.d.ts | 1 + tools/wpt/expectation.json | 24 ++------ 3 files changed, 109 insertions(+), 34 deletions(-) diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 9c6191fceeb0a8..6da57624821d03 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -317,6 +317,7 @@ const _controller = Symbol("[[controller]]"); const _detached = Symbol("[[Detached]]"); const _disturbed = Symbol("[[disturbed]]"); const _errorSteps = Symbol("[[ErrorSteps]]"); +const _finishPromise = Symbol("[[finishPromise]]"); const _flushAlgorithm = Symbol("[[flushAlgorithm]]"); const _globalObject = Symbol("[[globalObject]]"); const _highWaterMark = Symbol("[[highWaterMark]]"); @@ -609,8 +610,7 @@ function initializeTransformStream( } function cancelAlgorithm(reason) { - transformStreamErrorWritableAndUnblockWrite(stream, reason); - return resolvePromiseWith(undefined); + return transformStreamDefaultSourceCancelAlgorithm(stream, reason); } stream[_readable] = createReadableStream( @@ -3690,12 +3690,14 @@ function setUpReadableStreamDefaultReader(reader, stream) { * @param {TransformStreamDefaultController} controller * @param {(chunk: O, controller: TransformStreamDefaultController) => Promise} transformAlgorithm * @param {(controller: TransformStreamDefaultController) => Promise} flushAlgorithm + * @param {(reason: any) => Promise} cancelAlgorithm */ function setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, ) { assert(ObjectPrototypeIsPrototypeOf(TransformStreamPrototype, stream)); assert(stream[_controller] === undefined); @@ -3703,6 +3705,7 @@ function setUpTransformStreamDefaultController( stream[_controller] = controller; controller[_transformAlgorithm] = transformAlgorithm; controller[_flushAlgorithm] = flushAlgorithm; + controller[_cancelAlgorithm] = cancelAlgorithm; } /** @@ -3730,6 +3733,8 @@ function setUpTransformStreamDefaultControllerFromTransformer( }; /** @type {(controller: TransformStreamDefaultController) => Promise} */ let flushAlgorithm = () => resolvePromiseWith(undefined); + /** @type {(reason: any) => Promise} */ + let cancelAlgorithm = () => resolvePromiseWith(undefined); if (transformerDict.transform !== undefined) { transformAlgorithm = (chunk, controller) => webidl.invokeCallbackFunction( @@ -3752,11 +3757,23 @@ function setUpTransformStreamDefaultControllerFromTransformer( true, ); } + if (transformerDict.cancel !== undefined) { + cancelAlgorithm = (reason) => + webidl.invokeCallbackFunction( + transformerDict.cancel, + [reason], + transformer, + webidl.converters["Promise"], + "Failed to call 'cancelAlgorithm' on 'TransformStreamDefaultController'", + true, + ); + } setUpTransformStreamDefaultController( stream, controller, transformAlgorithm, flushAlgorithm, + cancelAlgorithm, ); } @@ -3938,6 +3955,7 @@ function setUpWritableStreamDefaultWriter(writer, stream) { function transformStreamDefaultControllerClearAlgorithms(controller) { controller[_transformAlgorithm] = undefined; controller[_flushAlgorithm] = undefined; + controller[_cancelAlgorithm] = undefined; } /** @@ -4007,13 +4025,33 @@ function transformStreamDefaultControllerTerminate(controller) { } /** - * @param {TransformStream} stream + * @template I + * @template O + * @param {TransformStream} stream * @param {any=} reason * @returns {Promise} */ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { - transformStreamError(stream, reason); - return resolvePromiseWith(undefined); + const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const readable = stream[_readable]; + controller[_finishPromise] = new Deferred(); + const cancelPromise = controller[_cancelAlgorithm](reason); + transformStreamDefaultControllerClearAlgorithms(controller); + transformPromiseWith(cancelPromise, () => { + if (readable[_state] === "errored") { + controller[_finishPromise].reject(readable[_storedError]); + } else { + readableStreamDefaultControllerError(readable[_controller], reason); + controller[_finishPromise].resolve(undefined); + } + }, (r) => { + readableStreamDefaultControllerError(readable[_controller], r); + controller[_finishPromise].reject(r); + }); + return controller[_finishPromise].promise; } /** @@ -4023,21 +4061,26 @@ function transformStreamDefaultSinkAbortAlgorithm(stream, reason) { * @returns {Promise} */ function transformStreamDefaultSinkCloseAlgorithm(stream) { - const readable = stream[_readable]; const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const readable = stream[_readable]; + controller[_finishPromise] = new Deferred(); const flushPromise = controller[_flushAlgorithm](controller); transformStreamDefaultControllerClearAlgorithms(controller); - return transformPromiseWith(flushPromise, () => { + transformPromiseWith(flushPromise, () => { if (readable[_state] === "errored") { - throw readable[_storedError]; + controller[_finishPromise].reject(readable[_storedError]); + } else { + readableStreamDefaultControllerClose(readable[_controller]); + controller[_finishPromise].resolve(undefined); } - readableStreamDefaultControllerClose( - /** @type {ReadableStreamDefaultController} */ readable[_controller], - ); }, (r) => { - transformStreamError(stream, r); - throw readable[_storedError]; + readableStreamDefaultControllerError(readable[_controller], r); + controller[_finishPromise].reject(r); }); + return controller[_finishPromise].promise; } /** @@ -4069,6 +4112,38 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) { return transformStreamDefaultControllerPerformTransform(controller, chunk); } +/** + * @template I + * @template O + * @param {TransformStream} stream + * @param {any=} reason + * @returns {Promise} + */ +function transformStreamDefaultSourceCancelAlgorithm(stream, reason) { + const controller = stream[_controller]; + if (controller[_finishPromise] !== undefined) { + return controller[_finishPromise].promise; + } + const writable = stream[_writable]; + controller[_finishPromise] = new Deferred(); + const cancelPromise = controller[_cancelAlgorithm](reason); + transformStreamDefaultControllerClearAlgorithms(controller); + transformPromiseWith(cancelPromise, () => { + if (writable[_state] === "errored") { + controller[_finishPromise].reject(writable[_storedError]); + } else { + writableStreamDefaultControllerErrorIfNeeded(writable[_controller], reason); + transformStreamUnblockWrite(stream); + controller[_finishPromise].resolve(undefined); + } + }, (r) => { + writableStreamDefaultControllerErrorIfNeeded(writable[_controller], r); + transformStreamUnblockWrite(stream); + controller[_finishPromise].reject(r); + }); + return controller[_finishPromise].promise; +} + /** * @param {TransformStream} stream * @returns {Promise} @@ -4104,9 +4179,7 @@ function transformStreamErrorWritableAndUnblockWrite(stream, e) { stream[_writable][_controller], e, ); - if (stream[_backpressure] === true) { - transformStreamSetBackpressure(stream, false); - } + transformStreamUnblockWrite(stream); } /** @@ -4122,6 +4195,15 @@ function transformStreamSetBackpressure(stream, backpressure) { stream[_backpressure] = backpressure; } +/** + * @param {TransformStream} stream + */ +function transformStreamUnblockWrite(stream) { + if (stream[_backpressure] === true) { + transformStreamSetBackpressure(stream, false); + } +} + /** * @param {WritableStream} stream * @param {any=} reason @@ -6007,6 +6089,10 @@ const TransformStreamPrototype = TransformStream.prototype; /** @template O */ class TransformStreamDefaultController { + /** @type {(reason: any) => Promise} */ + [_cancelAlgorithm]; + /** @type {Promise | undefined} */ + [_finishPromise]; /** @type {(controller: this) => Promise} */ [_flushAlgorithm]; /** @type {TransformStream} */ diff --git a/ext/web/lib.deno_web.d.ts b/ext/web/lib.deno_web.d.ts index 331c9536b1b45d..bcc0c12d804211 100644 --- a/ext/web/lib.deno_web.d.ts +++ b/ext/web/lib.deno_web.d.ts @@ -933,6 +933,7 @@ declare interface Transformer { readableType?: undefined; start?: TransformStreamDefaultControllerCallback; transform?: TransformStreamDefaultControllerTransformCallback; + cancel?: (reason: any) => Promise; writableType?: undefined; } diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json index 6181dcf40f0a33..9c628d73278eee 100644 --- a/tools/wpt/expectation.json +++ b/tools/wpt/expectation.json @@ -2493,32 +2493,20 @@ "transform-streams": { "backpressure.any.html": true, "backpressure.any.worker.html": true, - "errors.any.html": [ - "controller.error() should close writable immediately after readable.cancel()" - ], - "errors.any.worker.html": [ - "controller.error() should close writable immediately after readable.cancel()" - ], + "errors.any.html": true, + "errors.any.worker.html": true, "flush.any.html": true, "flush.any.worker.html": true, - "general.any.html": [ - "terminate() should abort writable immediately after readable.cancel()" - ], - "general.any.worker.html": [ - "terminate() should abort writable immediately after readable.cancel()" - ], + "general.any.html": true, + "general.any.worker.html": true, "lipfuzz.any.html": true, "lipfuzz.any.worker.html": true, "patched-global.any.html": true, "patched-global.any.worker.html": true, "properties.any.html": true, "properties.any.worker.html": true, - "reentrant-strategies.any.html": [ - "writer.abort() inside size() should work" - ], - "reentrant-strategies.any.worker.html": [ - "writer.abort() inside size() should work" - ], + "reentrant-strategies.any.html": true, + "reentrant-strategies.any.worker.html": true, "strategies.any.html": true, "strategies.any.worker.html": true, "terminate.any.html": true,