From b67cd8d0ac8a11961133d3c39a606116b5547ae5 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Mon, 9 Dec 2024 17:19:50 -0600 Subject: [PATCH] [WIP] add waitable-set --- design/mvp/Async.md | 245 +++++++++++++----- design/mvp/Binary.md | 13 +- design/mvp/CanonicalABI.md | 146 +++++++---- design/mvp/Explainer.md | 193 ++++++++++---- design/mvp/canonical-abi/definitions.py | 186 ++++++++++---- design/mvp/canonical-abi/run_tests.py | 325 +++++++++++++++--------- 6 files changed, 762 insertions(+), 346 deletions(-) diff --git a/design/mvp/Async.md b/design/mvp/Async.md index 59a1d7e4..ace782b1 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -15,6 +15,7 @@ summary of the motivation and animated sketch of the design in action. * [Sync and Async Functions](#sync-and-async-functions) * [Task](#task) * [Current task](#current-task) + * [Current execution context](#current-execution-context) * [Subtask and Supertask](#subtask-and-supertask) * [Structured concurrency](#structured-concurrency) * [Streams and Futures](#streams-and-futures) @@ -73,7 +74,7 @@ these languages' concurrency features are already bound (making the Component Model "just another OS" from the language toolchains' perspective). Moreover, this async ABI does not require components to use preemptive -multi-threading ([`thread.spawn`]) in order to achieve concurrency. Instead, +multi-threading ([`canon thread.spawn`]) in order to achieve concurrency. Instead, concurrency can be achieved by cooperatively switching between different logical tasks running on a single thread. This switching may require the use of [fibers] or a [CPS transform], but may also be avoided entirely when a @@ -177,6 +178,56 @@ although there can be multiple live `Task` objects in a component instance, "the current one" is always clear: it's the one passed to the current function as a parameter. +### Current Execution Context + +In addition to there always being a well-defined [current task], there is also +always a well-defined **current execution context**, each of which contains a +distinct mutable **context-local storage array** that can be read and written +from core wasm code by calling the `context.get` and `context.set` built-ins. + +The context-local storage array's length is currently fixed to contain exactly +2 `i32`s with the goal of allowing this array to be stored inline in whatever +existing runtime data structure is already efficiently reachable from ambient +compiled wasm code. Because module instantiation is declarative in the +Component Model, the imported `context.{get,set}` built-ins can be inlined by +the core wasm compiler as-if they were instructions, allowing the final +generated machine code to be a single load or store. This makes context-local +storage the ideal place to store both the linear-memory [shadow stack] pointer +as well as the pointer to the struct used to implement [thread-local storage] +APIs used by guest code. + +(In the future, `context.{get,set}` can be backwards-compatibly relaxed to +allow `i64`s in support of [memory64] pointers. Also, the length of the storage +array could be backwards-compatibly increased over time if necessary. When +[wasm-gc] is supported, the `i32` values can serve as indices into +guest-managed tables of typed references.) + +Unlike traditional thread-local storage, context-local storage abstracts the +identity of the underlying [kernel thread]: +* Each new [task] creates a fresh (zero-initialized) execution context to + serve as the current execution context for all core wasm frames *in the + same component instance* nested under that export call. +* Each new thread created by [`canon thread.spawn`] *also* creates a fresh + (zero-initialized) execution context to serve as the current execution + context for all core wasm frames *in the same component instance* nested + under the spawned function call. + +In both cases, if core wasm calls into another component, this callee creates a +new task which, according to the first bullet, becomes the current execution +for all core wasm calls in the callee component. + +Together, these rules support the implementation of traditional POSIX-style +[thread-local storage] in a `main()`-style program while also encapsulating the +identity of the thread used to call a component's export. This encapsulation is +beneficial for general modularity reasons but also practically useful for +avoiding subtle leaks relating to the ambiguous lifetime of context-local +storage, once allocated for a particular calling thread. With the above rule, +context-local storage can be deallocated as soon as the callee's task +completes, which the callee has clear knowledge and control of. + +For details, see [`canon context.get`] in the AST explainer and +[`canon_context_get`] in the Canonical ABI explainer. + ### Subtask and Supertask Each component-to-component call necessarily creates a new task in the callee. @@ -307,32 +358,53 @@ maintained for streams and futures by the Canonical ABI. When a component asynchronously lowers an import, it is explicitly requesting that, if the import blocks, control flow be returned back to the calling task so that it can do something else. Similarly, if `stream.read` or `stream.write` -would block, they return a "blocked" code so that the caller can continue to -make progress on other things. But eventually, a task will run out of other -things to do and will need to **wait** for progress on one of the task's -subtasks, readable stream ends, writable stream ends, readable future ends or -writable future ends, which are collectively called its **waitables**. While a -task is waiting on its waitables, the Component Model runtime can switch to -other running tasks or start new tasks by invoking exports. - -The Canonical ABI provides two ways for a task to wait: -* The task can call the [`task.wait`] built-in to synchronously wait for - progress. This is specified in the Canonical ABI by the [`canon_task_wait`] - function. -* The task can specify a `callback` function (in the `canon lift` definition) - and return to the event loop to wait for notification of progress by a call - to the `callback` function. This is specified in the Canonical ABI by - the `opts.callback` case in [`canon_lift`]. +are called asynchronously and would block, they return a "blocked" code so that +the caller can continue to make progress on other things. But eventually, a +task will run out of other things to do and will need to **wait** for progress +on one of the task's subtasks, reads or writes, which are collectively called +its **waitables**. The Canonical ABI Python represents waitables with a common +[`Waitable`] base class. While a task is waiting, the Component Model runtime +can switch to other running tasks or start new tasks by invoking exports. + +To avoid the O(N) cost of processing an N-ary list of waitables every time a +task needs to wait (which is the classic performance bottleneck of, e.g., POSIX +`select()`), the Canonical ABI allows waitables to be maintained in **waitable +sets** which (like `epoll()`) can be waited upon as a whole for any one of the +member waitables to make progress. Waitable sets are independent of tasks; +tasks can wait on different waitable sets over time and a single waitable set +can be waited upon by multiple tasks at once. Waitable sets are local to a +component instance and cannot be shared across component boundaries. + +The Canonical ABI provides two ways for a task to wait on a waitable set: +* Core wasm can pass (the index of) the waitable set as a parameter to the + [`canon waitable-set.wait`] built-in which blocks and returns the event that + occurred. +* If the task has a `callback` function (specified in [`canon lift`]), core + wasm can return (the index of) the waitable set as a return value to the + event loop, which will block and then pass the event that occurred as a + parameter to the `callback`. While the two approaches have significant runtime implementation differences (the former requires [fibers] or a [CPS transform] while the latter only -requires storing a small `i32` "context" in the task), semantically they do the -same thing which, in the Canonical ABI Python code, is factored out into -[`Task`]'s `wait` method. Thus, the difference between `callback` and -non-`callback` is mostly one of optimization, not expressivity. - -The Canonical ABI Python represents waitables with a common [`Waitable`] -base class. +requires storing the fixed-size context-local storage in the task), +semantically they do the same thing which, in the Canonical ABI Python code, is +factored out into the [`Task.wait`] method. Thus, the difference between +`callback` and non-`callback` is one of optimization, not expressivity. + +In addition to *waiting* for an event to occur, a task can also *poll* for +whether an event has already occurred. Polling does not block, but does allow +other tasks to be switched to and executed. Polling is opportunistic, allowing +the servicing of higher-priority events in the middle of longer-running +computations; when there is nothing left to do, a task must *wait*. A task +can poll by either calling [`canon waitable-set.poll`] or, when using a +`callback`, by returning a Canonical-ABI-defined "poll" code to the event loop +along with (the index of) the waitable set to poll. + +Lastly, if a long-running task wants to allow other tasks to execute, without +having any of its own subtasks to wait on, it can simply *yield* which allows +other tasks to be scheduled before continuing execution in the current task. A +task can yield by either calling [`canon yield`] or, when using a `callback`, +by returning a Canonical-ABI-defined "yield" code to the event loop. ### Backpressure @@ -341,16 +413,16 @@ export calls can start piling up, each consuming some of the component's finite private resources (like linear memory), requiring the component to be able to exert *backpressure* to allow some tasks to finish (and release private resources) before admitting new async export calls. To do this, a component may -call the `task.backpressure` built-in to set a "backpressure" flag that causes -subsequent export calls to immediately return in the "starting" state without -calling the component's Core WebAssembly code. +call the [`canon backpressure.set`] built-in to set a component-instance-wide +"backpressure" flag that causes subsequent export calls to immediately return +in the "starting" state without calling the component's Core WebAssembly code. Once task enables backpressure, it can [wait](#waiting) for existing tasks to finish and release their associated resources. Thus, a task can choose to [wait](#waiting) with or without backpressure enabled, depending on whether it wants to accept new accept new export calls while waiting or not. -See the [`canon_task_backpressure`] function and [`Task.enter`] method in the +See the [`canon_backpressure_set`] function and [`Task.enter`] method in the Canonical ABI explainer for the setting and implementation of backpressure. Once a task is allowed to start according to these backpressure rules, its @@ -359,7 +431,7 @@ the "started" state. ### Returning -The way an async function returns its value is by calling [`task.return`], +The way an async function returns its value is by calling [`canon task.return`], passing the core values that are to be lifted as *parameters*. Additionally, when the `always-task-return` `canonopt` is set, synchronous functions also return their values by calling `task.return` (as a more expressive and @@ -400,18 +472,28 @@ replaced with `...` to focus on the overall flow of function calls. (import "libc" "mem" (memory 1)) (import "libc" "realloc" (func (param i32 i32 i32 i32) (result i32))) (import "" "fetch" (func $fetch (param i32 i32) (result i32))) + (import "" "waitable-set.new" (func $new_waitable_set (result i32))) + (import "" "waitable-set.wait" (func $wait (param i32 i32) (result i32))) + (import "" "waitable.join" (func $join (param i32 i32))) (import "" "task.return" (func $task_return (param i32 i32))) - (import "" "task.wait" (func $wait (param i32) (result i32))) + (global $ws (mut i32)) + (func $start + (global.set $ws (call $new_waitable_set)) + ) + (start $start) (func (export "summarize") (param i32 i32) ... loop ... call $fetch ;; pass a pointer-to-string and pointer-to-list-of-bytes outparam ... ;; ... and receive the index of a new async subtask + global.get $ws + call $join ;; ... and add it to the waitable set + ... end loop ;; loop as long as there are any subtasks ... - call $task_wait ;; wait for a subtask to make progress + call $wait ;; wait for a subtask in the waitable set to make progress ... end ... @@ -423,14 +505,18 @@ replaced with `...` to focus on the overall flow of function calls. (alias $libc "mem" (core memory $mem)) (alias $libc "realloc" (core func $realloc)) (canon lower $fetch async (memory $mem) (realloc $realloc) (core func $fetch')) + (canon waitable-set.new (core func $new)) + (canon waitable-set.wait async (memory $mem) (core func $wait)) + (canon waitable.join (core func $join)) (canon task.return (result string) async (memory $mem) (realloc $realloc) (core func $task_return)) - (canon task.wait async (memory $mem) (core func $task_wait)) (core instance $main (instantiate $Main (with "" (instance (export "mem" (memory $mem)) (export "realloc" (func $realloc)) (export "fetch" (func $fetch')) + (export "waitable-set.new" (func $new)) + (export "waitable-set.wait" (func $wait)) + (export "waitable.join" (func $join)) (export "task.return" (func $task_return)) - (export "task.wait" (func $task_wait)) )))) (canon lift (core func $main "summarize") async (memory $mem) (realloc $realloc) @@ -441,25 +527,21 @@ replaced with `...` to focus on the overall flow of function calls. Because the imported `fetch` function is `canon lower`ed with `async`, its core function type (shown in the first import of `$Main`) takes pointers to the parameter and results (which are asynchronously read-from and written-to) and -returns the index of a new subtask. `summarize` calls `task.wait` repeatedly -until all `fetch` subtasks have finished, noting that `task.wait` can return -intermediate progress (as subtasks transition from "starting" to "started" to -"returned") which tell the surrounding core wasm code that it can reclaim the -memory passed arguments or use the results that have now been written to the -outparam memory. +returns the index of a new subtask. `summarize` calls `waitable-set.wait` +repeatedly until all `fetch` subtasks have finished, noting that +`waitable-set.wait` can return intermediate progress (as subtasks transition +from "starting" to "started" to "returned") which tell the surrounding core +wasm code that it can reclaim the memory passed arguments or use the results +that have now been written to the outparam memory. Because the `summarize` function is `canon lift`ed with `async`, its core -function type has no results, since results are passed out via `task.return`. -It also means that multiple `summarize` calls can be active at once: once the -first call to `task.wait` blocks, the runtime will suspend its callstack +function type has no results; results are passed out via `task.return`. It also +means that multiple `summarize` calls can be active at once: once the first +call to `waitable-set.wait` blocks, the runtime will suspend its callstack (fiber) and start a new stack for the new call to `summarize`. Thus, `summarize` must be careful to allocate a separate linear-memory stack in its -entry point, if one is needed, and to save and restore this before and after -calling `task.wait`. - -(Note that, for brevity this example ignores the `memory` and `realloc` -immediates required by `canon lift` and `canon lower` to allocate the `list` -param and `string` result, resp.) +entry point and store it in context-local storage (via `context.set`) instead +of simply using a `global`, as in a synchronous function. This same example can be re-written to use the `callback` immediate (thereby avoiding the need for fibers) as follows. Note that the internal structure of @@ -480,37 +562,55 @@ not externally-visible behavior. (import "libc" "mem" (memory 1)) (import "libc" "realloc" (func (param i32 i32 i32 i32) (result i32))) (import "" "fetch" (func $fetch (param i32 i32) (result i32))) + (import "" "waitable-set.new" (func $new_waitable_set (result i32))) + (import "" "waitable.join" (func $join (param i32 i32))) (import "" "task.return" (func $task_return (param i32 i32))) + (global $ws (mut i32)) + (func $start + (global.set $ws (call $new_waitable_set)) + ) + (start $start) (func (export "summarize") (param i32 i32) (result i32) ... loop ... call $fetch ;; pass a pointer-to-string and pointer-to-list-of-bytes outparam ... ;; ... and receive the index of a new async subtask + global.get $ws + call $join ;; ... and add it to the waitable set + ... end - ... ;; return a non-zero "cx" value passed to the next call to "cb" + (i32.or ;; (WAIT | (waitable-set-index << 4)) + (i32.const 1) ;; 1 = WAIT + (i32.shl + (global.get $ws) + (i32.const 4))) ) - (func (export "cb") (param $cx i32) (param $event i32) (param $p1 i32) (param $p2 i32) + (func (export "cb") (param $event i32) (param $p1 i32) (param $p2 i32) ... - if ... subtasks remain ... - get_local $cx - return ;; wait for another subtask to make progress + if (result i32) ;; if subtasks remain: + i32.const 1 ;; 1 = WAIT + else ;; if no subtasks remain: + ... + call $task_return ;; return the string result (pointer,length) + ... + i32.const 0 ;; 0 = EXIT end - ... - call $task_return ;; return the string result (pointer,length) - ... - i32.const 0 ;; return zero to signal that this task is done ) ) (core instance $libc (instantiate $Libc)) (alias $libc "mem" (core memory $mem)) (alias $libc "realloc" (core func $realloc)) (canon lower $fetch async (memory $mem) (realloc $realloc) (core func $fetch')) + (canon waitable-set.new (core func $new)) + (canon waitable.join (core func $join)) (canon task.return (result string) async (memory $mem) (realloc $realloc) (core func $task_return)) (core instance $main (instantiate $Main (with "" (instance (export "mem" (memory $mem)) (export "realloc" (func $realloc)) (export "fetch" (func $fetch')) + (export "waitable-set.new" (func $new)) + (export "waitable.join" (func $join)) (export "task.return" (func $task_return)) )))) (canon lift (core func $main "summarize") @@ -519,6 +619,9 @@ not externally-visible behavior. (export "summarize" (func $summarize)) ) ``` +For an explanation of the bitpacking of the `i32` callback return value, +see [`unpack_callback_result`] in the Canonical ABI explainer. + While this example spawns all the subtasks in the initial call to `summarize`, subtasks can also be spawned from `cb` (even after the call to `task.return`). It's also possible for `summarize` to call `task.return` called eagerly in the @@ -531,7 +634,7 @@ these values is defined by the Canonical ABI. ## Interaction with multi-threading -For now, the integration between multi-threading (via [`thread.spawn`]) and +For now, the integration between multi-threading (via [`canon thread.spawn`]) and native async is limited. In particular, because all [lift and lower definitions] produce non-`shared` functions, any threads spawned by a component via `thread.spawn` will not be able to directly call imports (synchronously @@ -607,25 +710,33 @@ comes after: [CPS Transform]: https://en.wikipedia.org/wiki/Continuation-passing_style [Event Loop]: https://en.wikipedia.org/wiki/Event_loop [Structured Concurrency]: https://en.wikipedia.org/wiki/Structured_concurrency +[Thread-local Storage]: https://en.wikipedia.org/wiki/Thread-local_storage +[Kernel Threads]: https://en.wikipedia.org/wiki/Thread_(computing)#Kernel_threads [AST Explainer]: Explainer.md -[Lift and Lower Definitions]: Explainer.md#canonical-definitions -[Lifted]: Explainer.md#canonical-definitions +[Lift and Lower Definitions]: Explainer.md#canonical-abi +[`canon lift`]: Explainer.md#canonical-abi +[Lifted]: Explainer.md#canonical-abi [Canonical Built-in]: Explainer.md#canonical-built-ins -[`task.return`]: Explainer.md#-taskreturn -[`task.wait`]: Explainer.md#-taskwait -[`thread.spawn`]: Explainer.md#-threadspawn +[`canon context.get`]: Explainer.md#-contextget +[`canon backpressure.set`]: Explainer.md#-backpressureset +[`canon task.return`]: Explainer.md#-taskreturn +[`canon waitable-set.wait`]: Explainer.md#-waitable-setwait +[`canon waitable-set.poll`]: Explainer.md#-waitable-setpoll +[`canon thread.spawn`]: Explainer.md#-threadspawn [ESM-integration]: Explainer.md#ESM-integration [Canonical ABI Explainer]: CanonicalABI.md [`canon_lift`]: CanonicalABI.md#canon-lift -[`canon_lift`]: CanonicalABI.md#canon-lift +[`unpack_callback_result`]: CanonicalABI.md#canon-lift [`canon_lower`]: CanonicalABI.md#canon-lower -[`canon_task_wait`]: CanonicalABI.md#-canon-taskwait -[`canon_task_backpressure`]: CanonicalABI.md#-canon-taskbackpressure +[`canon_context_get`]: CanonicalABI.md#-canon-contextget +[`canon_backpressure_set`]: CanonicalABI.md#-canon-backpressureset +[`canon_waitable_set_wait`]: CanonicalABI.md#-canon-waitable-setwait [`canon_task_return`]: CanonicalABI.md#-canon-taskreturn [`Task`]: CanonicalABI.md#task-state [`Task.enter`]: CanonicalABI.md#task-state +[`Task.wait`]: CanonicalABI.md#task-state [`Waitable`]: CanonicalABI.md#waitable-state [`Subtask`]: CanonicalABI.md#subtask-state [Stream State]: CanonicalABI.md#stream-state @@ -641,6 +752,8 @@ comes after: [stack-switching]: https://github.com/WebAssembly/stack-switching/ [JSPI]: https://github.com/WebAssembly/js-promise-integration/ [shared-everything-threads]: https://github.com/webAssembly/shared-everything-threads +[memory64]: https://github.com/webAssembly/memory64 +[wasm-gc]: https://github.com/WebAssembly/gc/blob/main/proposals/gc/MVP.md [WASI Preview 3]: https://github.com/WebAssembly/WASI/tree/main/wasip2#looking-forward-to-preview-3 [`wasi:http/handler.handle`]: https://github.com/WebAssembly/wasi-http/blob/main/wit-0.3.0-draft/handler.wit diff --git a/design/mvp/Binary.md b/design/mvp/Binary.md index 6dc931dd..52d6afdd 100644 --- a/design/mvp/Binary.md +++ b/design/mvp/Binary.md @@ -290,11 +290,11 @@ canon ::= 0x00 0x00 f: opts: ft: => (canon lift | 0x04 rt: => (canon resource.rep rt (core func)) | 0x05 ft: => (canon thread.spawn ft (core func)) ๐Ÿงต | 0x06 => (canon thread.available_parallelism (core func)) ๐Ÿงต - | 0x08 => (canon task.backpressure (core func)) ๐Ÿ”€ + | 0x08 => (canon backpressure.set (core func)) ๐Ÿ”€ | 0x09 rs: opts: => (canon task.return rs opts (core func)) ๐Ÿ”€ - | 0x0a async?:? m: => (canon task.wait async? (memory m) (core func)) ๐Ÿ”€ - | 0x0b async?:? m: => (canon task.poll async? (memory m) (core func)) ๐Ÿ”€ - | 0x0c async?:? => (canon task.yield async? (core func)) ๐Ÿ”€ + | 0x0a 0x7f i: => (canon context.get i32 i (core func)) ๐Ÿ”€ + | 0x0b 0x7f i: => (canon context.set i32 i (core func)) ๐Ÿ”€ + | 0x0c async?:? => (canon yield async? (core func)) ๐Ÿ”€ | 0x0d => (canon subtask.drop (core func)) ๐Ÿ”€ | 0x0e t: => (canon stream.new t (core func)) ๐Ÿ”€ | 0x0f t: opts: => (canon stream.read t opts (core func)) ๐Ÿ”€ @@ -313,6 +313,11 @@ canon ::= 0x00 0x00 f: opts: ft: => (canon lift | 0x1c opts: => (canon error-context.new opts (core func)) ๐Ÿ”€ | 0x1d opts: => (canon error-context.debug-message opts (core func)) ๐Ÿ”€ | 0x1e => (canon error-context.drop (core func)) ๐Ÿ”€ + | 0x1f => (canon waitable-set.new (core func)) ๐Ÿ”€ + | 0x20 async?:? m: => (canon waitable-set.wait async? (memory m) (core func)) ๐Ÿ”€ + | 0x21 async?:? m: => (canon waitable-set.poll async? (memory m) (core func)) ๐Ÿ”€ + | 0x22 => (canon waitable-set.drop (core func)) ๐Ÿ”€ + | 0x23 => (canon waitable.join (core func)) ๐Ÿ”€ async? ::= 0x00 => | 0x01 => async opts ::= opt*:vec() => opt* diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index f209c3db..6e9b1e22 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -36,11 +36,16 @@ being specified here. * [`canon resource.new`](#canon-resourcenew) * [`canon resource.drop`](#canon-resourcedrop) * [`canon resource.rep`](#canon-resourcerep) - * [`canon task.backpressure`](#-canon-taskbackpressure) ๐Ÿ”€ + * [`canon context.get`](#-canon-contextget) ๐Ÿ”€ + * [`canon context.set`](#-canon-contextset) ๐Ÿ”€ + * [`canon backpressure.set`](#-canon-backpressureset) ๐Ÿ”€ * [`canon task.return`](#-canon-taskreturn) ๐Ÿ”€ - * [`canon task.wait`](#-canon-taskwait) ๐Ÿ”€ - * [`canon task.poll`](#-canon-taskpoll) ๐Ÿ”€ - * [`canon task.yield`](#-canon-taskyield) ๐Ÿ”€ + * [`canon yield`](#-canon-yield) ๐Ÿ”€ + * [`canon waitable-set.new`](#-canon-waitable-setnew) ๐Ÿ”€ + * [`canon waitable-set.wait`](#-canon-waitable-setwait) ๐Ÿ”€ + * [`canon waitable-set.poll`](#-canon-waitable-setpoll) ๐Ÿ”€ + * [`canon waitable-set.drop`](#-canon-waitable-setdrop) ๐Ÿ”€ + * [`canon waitable.join`](#-canon-waitablejoin) ๐Ÿ”€ * [`canon subtask.drop`](#-canon-subtaskdrop) ๐Ÿ”€ * [`canon {stream,future}.new`](#-canon-streamfuturenew) ๐Ÿ”€ * [`canon {stream,future}.{read,write}`](#-canon-streamfuturereadwrite) ๐Ÿ”€ @@ -159,6 +164,7 @@ behavior and enforce invariants. class ComponentInstance: resources: Table[ResourceHandle] waitables: Table[Waitable] + waitable_sets: Table[WaitableSet] error_contexts: Table[ErrorContext] num_tasks: int may_leave: bool @@ -170,6 +176,7 @@ class ComponentInstance: def __init__(self): self.resources = Table[ResourceHandle]() self.waitables = Table[Waitable]() + self.waitable_sets = Table[WaitableSet]() self.error_contexts = Table[ErrorContext]() self.num_tasks = 0 self.may_leave = True @@ -194,7 +201,7 @@ class Table(Generic[ElemT]): array: list[Optional[ElemT]] free: list[int] - MAX_LENGTH = 2**30 - 1 + MAX_LENGTH = 2**28 - 1 def __init__(self): self.array = [None] @@ -390,6 +397,26 @@ that do all the heavy lifting are shared with function parameter/result lifting and lowering and defined below. +#### Context-Local Storage + +TODO +```python +class ContextLocalStorage: + LENGTH = 2 + array: list[int] + + def __init__(self): + self.array = [0] * ContextLocalStorage.LENGTH + + def set(self, i, v): + assert(types_match_values(['i32'], [v])) + self.array[i] = v + + def get(self, i): + return self.array[i] +``` + + #### Task State A `Task` object is created for each call to `canon_lift` and is implicitly @@ -404,9 +431,9 @@ class Task: caller: Optional[Task] on_return: Optional[Callable] on_block: Callable[[Awaitable], Awaitable] - waitable_set: WaitableSet num_subtasks: int num_borrows: int + context: ContextLocalStorage def __init__(self, opts, inst, ft, caller, on_return, on_block): self.opts = opts @@ -418,6 +445,7 @@ class Task: self.waitable_set = WaitableSet() self.num_subtasks = 0 self.num_borrows = 0 + self.context = ContextLocalStorage() ``` Using a conservative syntactic analysis of a complete component, an optimizing implementation can statically eliminate fields when a particular feature (such @@ -586,17 +614,6 @@ in `enter`, preventing more pending tasks from being started in the interim. pending_future.set_result(None) ``` -The `Task.wait` and `Task.poll` methods delegate to `WaitableSet`, defined in -the next section. -```python - async def wait(self, sync) -> EventTuple: - return await self.wait_on(sync, self.waitable_set.wait()) - - async def poll(self, sync) -> Optional[EventTuple]: - await self.yield_(sync) - return self.waitable_set.poll() -``` - The `Task.yield_` method is called by `canon task.yield` or, when `callback` is used, when core wasm returns the "yield" code to the event loop. Yielding allows the runtime to switch execution to another task without having to wait @@ -717,7 +734,6 @@ decrement of `num_tasks`. def exit(self): assert(Task.current.locked()) trap_if(self.num_subtasks > 0) - self.waitable_set.drop() trap_if(self.on_return) assert(self.num_borrows == 0) trap_if(self.inst.num_tasks == 1 and self.inst.backpressure) @@ -745,19 +761,19 @@ code that produces the events (specifically, in `subtask_event` and `copy_event`). ```python class CallState(IntEnum): - STARTING = 0 - STARTED = 1 - RETURNED = 2 + STARTING = 1 + STARTED = 2 + RETURNED = 3 class EventCode(IntEnum): + NONE = 0 CALL_STARTING = CallState.STARTING CALL_STARTED = CallState.STARTED CALL_RETURNED = CallState.RETURNED - YIELDED = 3 - STREAM_READ = 4 - STREAM_WRITE = 5 - FUTURE_READ = 6 - FUTURE_WRITE = 7 + STREAM_READ = 5 + STREAM_WRITE = 6 + FUTURE_READ = 7 + FUTURE_WRITE = 8 EventTuple = tuple[EventCode, int, int] ``` @@ -850,15 +866,19 @@ polling. class WaitableSet: elems: list[Waitable] maybe_has_pending_event: asyncio.Event + num_waiting: int def __init__(self): self.elems = [] self.maybe_has_pending_event = asyncio.Event() + self.num_waiting = 0 async def wait(self) -> EventTuple: + self.num_waiting += 1 while True: await self.maybe_has_pending_event.wait() if (e := self.poll()): + self.num_waiting -= 1 return e def poll(self) -> Optional[EventTuple]: @@ -873,11 +893,11 @@ class WaitableSet: def drop(self): trap_if(len(self.elems) > 0) + trap_if(self.num_waiting > 0) ``` -The `WaitableSet.drop` method traps if dropped (by the owning `Task`) while it -still contains elements (whose `Waitable.waitable_set` field would become -dangling). This can happen if a task tries to exit in the middle of a stream or -future copy operation. +The `WaitableSet.drop` method traps if dropped while it still contains elements +(whose `Waitable.waitable_set` field would become dangling) or if it is being +waited-upon by another `Task`. Note: the `random.shuffle` in `poll` is meant to give runtimes the semantic freedom to schedule delivery of events non-deterministically (e.g., taking into @@ -929,7 +949,6 @@ turn only happens if the call is `async` *and* blocks. In this case, the self.supertask = task self.supertask.num_subtasks += 1 Waitable.__init__(self) - Waitable.join(self, task.waitable_set) return task.inst.waitables.add(self) ``` The `num_subtasks` increment ensures that the parent `Task` cannot `exit` @@ -2700,23 +2719,34 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo [] = await call_and_trap_on_throw(callee, task, flat_args) assert(types_match_values(flat_ft.results, [])) else: - [packed_ctx] = await call_and_trap_on_throw(callee, task, flat_args) - assert(types_match_values(flat_ft.results, [packed_ctx])) - while packed_ctx != 0: - is_yield = bool(packed_ctx & 1) - ctx = packed_ctx & ~1 - if is_yield: - await task.yield_(sync = False) - event, p1, p2 = (EventCode.YIELDED, 0, 0) + [packed] = await call_and_trap_on_throw(callee, task, flat_args) + while True: + code,si = unpack_callback_result(packed) + match code: + case CallbackCode.EXIT: + break + case CallbackCode.WAIT: + s = task.inst.waitable_sets.get(si) + e = await task.wait_on(opts.sync, s.wait()) + case CallbackCode.POLL: + s = task.inst.waitable_sets.get(si) + await task.yield_(opts.sync) + e = s.poll() + case CallbackCode.YIELD: + await task.yield_(opts.sync) + e = None + if e: + event, p1, p2 = e else: - event, p1, p2 = await task.wait(sync = False) - [packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2]) + event, p1, p2 = (EventCode.NONE, 0, 0) + [packed] = await call_and_trap_on_throw(opts.callback, task, [event, p1, p2]) task.exit() ``` In the `sync` case, if the `always-task-return` ABI option is *not* set, then `task.return_` will be called by `callee` to return values; otherwise, `task.return_` must be called by `canon_lift`. +TODO In the `async` case, there are two sub-cases depending on whether the `callback` `canonopt` was set. When `callback` is present, waiting happens in an "event loop" inside `canon_lift` which also allows yielding (i.e., allowing @@ -2727,6 +2757,24 @@ coroutine) to switch to another task. Thus, `callback` is an optimization for avoiding fiber creation for async languages that don't need it (e.g., JS, Python, C# and Rust). +TODO +```python +class CallbackCode(IntEnum): + EXIT = 0 + WAIT = 1 + POLL = 2 + YIELD = 3 + MAX = 3 + +def unpack_callback_result(packed): + code = packed & 0xf + trap_if(code > CallbackCode.MAX) + assert(packed < 2**32) + assert(Table.MAX_LENGTH < 2**28) + waitable_set_index = packed >> 4 + return (CallbackCode(code), waitable_set_index) +``` + Uncaught Core WebAssembly [exceptions] result in a trap at component boundaries. Thus, if a component wishes to signal an error, it must use some sort of explicit type such as `result` (whose `error` case particular language @@ -2838,9 +2886,9 @@ immediately return control flow back to the `async` caller if `callee` blocks: subtask.finish() return (EventCode(subtask.state), subtaski, 0) subtask.set_event(subtask_event) - assert(0 < subtaski <= Table.MAX_LENGTH < 2**30) - assert(0 <= int(subtask.state) < 2**2) - flat_results = [subtaski | (int(subtask.state) << 30)] + assert(0 < subtaski <= Table.MAX_LENGTH < 2**28) + assert(0 <= int(subtask.state) < 2**4) + flat_results = [int(subtask.state) | (subtaski << 4)] return flat_results ``` @@ -2990,11 +3038,11 @@ Note that the "locally-defined" requirement above ensures that only the component instance defining a resource can access its representation. -### ๐Ÿ”€ `canon task.backpressure` +### ๐Ÿ”€ `canon backpressure.set` For a canonical definition: ```wasm -(canon task.backpressure (core func $f)) +(canon backpressure.set (core func $f)) ``` validation specifies: * `$f` is given type `[i32] -> []` @@ -3002,7 +3050,7 @@ validation specifies: Calling `$f` invokes the following function, which sets the `backpressure` flag on the current `ComponentInstance`: ```python -async def canon_task_backpressure(task, flat_args): +async def canon_backpressure_set(task, flat_args): trap_if(task.opts.sync) task.inst.backpressure = bool(flat_args[0]) return [] @@ -3284,7 +3332,6 @@ context-switching overhead. def copy_event(revoke_buffer): revoke_buffer() e.copying = False - e.join(None) return (event_code, i, pack_copy_result(task, buffer, e)) def on_partial_copy(revoke_buffer): e.set_event(partial(copy_event, revoke_buffer)) @@ -3292,7 +3339,6 @@ context-switching overhead. e.set_event(partial(copy_event, revoke_buffer = lambda:())) if e.copy(buffer, on_partial_copy, on_copy_done) != 'done': e.copying = True - e.join(task.waitable_set) return [BLOCKED] return [pack_copy_result(task, buffer, e)] ``` @@ -3533,6 +3579,8 @@ async def canon_error_context_drop(task, i): ### ๐Ÿงต `canon thread.spawn` +TODO: add new about new ContextLocalStorage + For a canonical definition: ```wasm (canon thread.spawn (type $ft) (core func $st)) diff --git a/design/mvp/Explainer.md b/design/mvp/Explainer.md index e1ee6431..af75892c 100644 --- a/design/mvp/Explainer.md +++ b/design/mvp/Explainer.md @@ -1402,11 +1402,16 @@ canon ::= ... | (canon resource.new (core func ?)) | (canon resource.drop async? (core func ?)) | (canon resource.rep (core func ?)) - | (canon task.backpressure (core func ?)) ๐Ÿ”€ + | (canon context.get (core func ?)) ๐Ÿ”€ + | (canon context.set (core func ?)) ๐Ÿ”€ + | (canon backpressure.set (core func ?)) ๐Ÿ”€ | (canon task.return (result )? * (core func ?)) ๐Ÿ”€ - | (canon task.wait async? (memory ) (core func ?)) ๐Ÿ”€ - | (canon task.poll async? (memory ) (core func ?)) ๐Ÿ”€ - | (canon task.yield async? (core func ?)) ๐Ÿ”€ + | (canon yield async? (core func ?)) ๐Ÿ”€ + | (canon waitable-set.new (core func ?)) ๐Ÿ”€ + | (canon waitable-set.wait async? (memory ) (core func ?)) ๐Ÿ”€ + | (canon waitable-set.poll async? (memory ) (core func ?)) ๐Ÿ”€ + | (canon waitable-set.drop (core func ?)) ๐Ÿ”€ + | (canon waitable.join (core func ?)) ๐Ÿ”€ | (canon subtask.drop (core func ?)) ๐Ÿ”€ | (canon stream.new (core func ?)) ๐Ÿ”€ | (canon stream.read * (core func ?)) ๐Ÿ”€ @@ -1530,18 +1535,43 @@ transferring ownership of the newly-created resource to the export's caller. See the [async explainer](Async.md) for high-level context and terminology and the [Canonical ABI explainer] for detailed runtime semantics. -###### ๐Ÿ”€ `task.backpressure` +###### ๐Ÿ”€ `context.get` + +| Synopsis | | +| -------------------------- | ------------------ | +| Approximate WIT signature | `func() -> T` | +| Canonical ABI signature | `[] -> [T]` | + +The `context.get` built-in returns the `i`th element of the [current execution +context]'s [context-local storage array]. Validation currently restricts `i` to +be less than 2 and `t` to be `i32`, but will be relaxed in the future. (See +also [`canon_context_get`] in the Canonical ABI explainer for details.) + +###### ๐Ÿ”€ `context.set` + +| Synopsis | | +| -------------------------- | ----------------- | +| Approximate WIT signature | `func(v: T)` | +| Canonical ABI signature | `[T] -> []` | + +The `context.set` built-in sets the `i`th element of the [current execution +context]'s [context-local storage array] to the value `v`. Validation currently +restricts `i` to be less than 2 and `t` to be `i32`, but will be relaxed in the +future. (See also [`canon_context_set`] in the Canonical ABI explainer for +details.) + +###### ๐Ÿ”€ `backpressure.set` | Synopsis | | | -------------------------- | --------------------- | | Approximate WIT signature | `func(enable: bool)` | | Canonical ABI signature | `[enable:i32] -> []` | -The `task.backpressure` built-in allows the async-lifted callee to toggle a +The `backpressure.set` built-in allows the async-lifted callee to toggle a per-component-instance flag that, when set, prevents new incoming export calls to the component (until the flag is unset). This allows the component to exert -[backpressure]. (See also [`canon_task_backpressure`] in the Canonical ABI -explainer.) +[backpressure]. (See also [`canon_backpressure_set`] in the Canonical ABI +explainer for details.) ###### ๐Ÿ”€ `task.return` @@ -1553,24 +1583,52 @@ called, the declared return type and `canonopt`s are checked to exactly match those of the current task. (See also "[Returning]" in the async explainer and [`canon_task_return`] in the Canonical ABI explainer.) -###### ๐Ÿ”€ `task.wait` +###### ๐Ÿ”€ `yield` + +| Synopsis | | +| -------------------------- | ------------------ | +| Approximate WIT signature | `func()` | +| Canonical ABI signature | `[] -> []` | + +The `yield` built-in allows the runtime to switch to other tasks, enabling a +long-running computation to cooperatively interleave execution. If the `async` +immediate is present, the runtime can switch to other tasks in the *same* +component instance, which the calling core wasm must be prepared to handle. If +`async` is not present, only tasks in *other* component instances may be +switched to. (See also [`canon_yield`] in the Canonical ABI explainer for +details.) -| Synopsis | | -| -------------------------- | ---------------------------------------- | -| Approximate WIT signature | `func() -> event` | -| Canonical ABI signature | `[payload_addr:i32] -> [event-kind:i32]` | +###### ๐Ÿ”€ `waitable-set.new` -where `event`, `event-kind`, and `payload` are defined in WIT as: +| Synopsis | | +| -------------------------- | ------------------------ | +| Approximate WIT signature | `func() -> waitable-set` | +| Canonical ABI signature | `[] -> [i32]` | + +The `waitable-set.new` built-in returns the `i32` index of a new [waitable +set]. The `waitable-set` type is not a true WIT-level type but instead serves +to document associated built-ins below. Waitable sets start out empty and are +populated explicitly with [waitables] by `waitable.join`. (See also +[`canon_waitable_set_new`] in the Canonical ABI explainer for details.) + +###### ๐Ÿ”€ `waitable-set.wait` + +| Synopsis | | +| -------------------------- | ---------------------------------------------- | +| Approximate WIT signature | `func(s: waitable-set) -> event` | +| Canonical ABI signature | `[s:i32 payload-addr:i32] -> [event-code:i32]` | + +where `event`, `event-code`, and `payload` are defined in WIT as: ```wit record event { - kind: event-kind, + kind: event-code, payload: payload, } -enum event-kind { +enum event-code { + none, call-starting, call-started, call-returned, - yielded, stream-read, stream-write, future-read, @@ -1582,8 +1640,11 @@ record payload { } ``` -The `task.wait` built-in waits for one of the pending events to occur, and then -returns an `event` describing it. +The `waitable-set.wait` built-in waits for any one of the [waitables] in the +given [waitable set] `s` to make progress and then returns an `event` +describing the event. The `event-code` `none` is never returned. Waitable sets +may be `wait`ed upon when empty, in which case the caller will necessarily +block until another task adds a waitable to the set that can make progress. If the `async` immediate is present, other tasks in the same component instance can be started (via export call) or resumed while the current task blocks. If @@ -1591,49 +1652,61 @@ can be started (via export call) or resumed while the current task blocks. If code until `wait` returns (however, *other* component instances may execute code in the interim). -In the Canonical ABI, the return value provides the `event-kind`, and the -`payload` value is stored at the address passed as the `payload_addr` -parameter. (See also "[Waiting]" in the async explainer and [`canon_task_wait`] -in the Canonical ABI explainer.) +In the Canonical ABI, the return value provides the `event-code`, and the +`payload` value is stored at the address passed as the `payload-addr` +parameter. (See also [`canon_waitable_set_wait`] in the Canonical ABI explainer +for details.) -###### ๐Ÿ”€ `task.poll` +###### ๐Ÿ”€ `waitable-set.poll` -| Synopsis | | -| -------------------------- | ----------------------------------- | -| Approximate WIT signature | `func() -> option ` | -| Canonical ABI signature | `[event_addr:i32] -> [is_some:i32]` | +| Synopsis | | +| -------------------------- | ---------------------------------------------- | +| Approximate WIT signature | `func(s: waitable-set) -> event` | +| Canonical ABI signature | `[s:i32 payload-addr:i32] -> [event-code:i32]` | -where `event`, `event-kind`, and `payload` are defined as in [`task.wait`](#-taskwait). +where `event`, `event-code`, and `payload` are defined as in +[`waitable-set.wait`](#-waitable-setwait). -The `task.poll` built-in returns either `none` if no event was immediately -available, or `some` containing an event code and payload. `poll` implicitly -performs a `task.yield`, allowing other tasks to be scheduled before `poll` -returns. The `async?` immediate is passed to `task.yield`, determining whether -other tasks in the same component instance may execute. +The `waitable-set.poll` built-in returns the `event-code` `none` if no event +was available without blocking. `poll` implicitly performs a `yield`, allowing +other tasks to be scheduled before `poll` returns. The `async?` immediate is +passed to `yield`, determining whether other code in the same component +instance may execute. -In the Canonical ABI, the return value `is_some` holds a boolean value -indicating whether an event was immediately available, and if so, the `event` -value, containing the code and payloads are stored into the buffer pointed to -by `event_addr`. (See also [`canon_task_poll`] n the Canonical ABI explainer.) +The Canonical ABI of `waitable-set.poll` is the same as `waitable-set.wait` +(with the `none` case indicated by returning `0`). (See also +[`canon_waitable_set_poll`] in the Canonical ABI explainer for details.) -###### ๐Ÿ”€ `task.yield` +###### ๐Ÿ”€ `waitable-set.drop` -| Synopsis | | -| -------------------------- | ------------------ | -| Approximate WIT signature | `func()` | -| Canonical ABI signature | `[] -> []` | +| Synopsis | | +| -------------------------- | ------------------------ | +| Approximate WIT signature | `func(s: waitable-set)` | +| Canonical ABI signature | `[s:i32] -> []` | -The `task.yield` built-in allows the runtime to switch to another task, -enabling a long-running computation to cooperatively interleave execution with -other tasks. +The `waitable-set.drop` built-in removes the indicated [waitable set] from the +current instance's table of waitable sets, trapping if the waitable set is not +empty or if another task is concurrently `wait`ing on it. (See also +[`canon_waitable_set_drop`] in the Canonical ABI explainer for details.) -If the `async` immediate is present, other tasks in the same component instance -can be started (via export call) or resumed while the current task blocks and -thus the core wasm calling `task.yield` must be reentrant. If `async` is not -present, only tasks in *other* component instances may execute, and thus the -calling core wasm will not observe any reentrance. +###### ๐Ÿ”€ `waitable.join` + +| Synopsis | | +| -------------------------- | ---------------------------------------------------- | +| Approximate WIT signature | `func(w: waitable, maybe_set: option)` | +| Canonical ABI signature | `[w:i32, maybe_set:i32] -> []` | + +The `waitable.join` built-in may be called given a [waitable] and an optional +[waitable set]. `join` first removes `w` from any waitable set that it is a +member of and then, if `maybe_set` is not `none`, `w` is added to that set. +Thus, `join` can be used to arbitrarily add, change and remove waitables from +waitable sets in the same component instance, preserving the invariant that a +waitable can be in at most one set. -(See also [`canon_task_yield`] in the Canonical ABI explainer.) +In the Canonical ABI, `w` is an index into the component instance's [waitables] +table and can be any type of waitable (`subtask` or +`{readable,writable}-{stream,future}-end`). (See also [`canon_waitable_join`] +in the Canonical ABI explainer for details.) ###### ๐Ÿ”€ `subtask.drop` @@ -2702,11 +2775,16 @@ For some use-case-focused, worked examples, see: [Adapter Functions]: FutureFeatures.md#custom-abis-via-adapter-functions [Canonical ABI explainer]: CanonicalABI.md +[`canon_context_get`]: CanonicalABI.md#-canon-contextget +[`canon_context_set`]: CanonicalABI.md#-canon-contextset +[`canon_backpressure_set`]: CanonicalABI.md#-canon-backpressureset [`canon_task_return`]: CanonicalABI.md#-canon-taskreturn -[`canon_task_wait`]: CanonicalABI.md#-canon-taskwait -[`canon_task_poll`]: CanonicalABI.md#-canon-taskpoll -[`canon_task_yield`]: CanonicalABI.md#-canon-taskyield -[`canon_task_backpressure`]: CanonicalABI.md#-canon-taskbackpressure +[`canon_yield`]: CanonicalABI.md#-canon-yield +[`canon_waitable_set_new`]: CanonicalABI.md#-canon-waitable-setnew +[`canon_waitable_set_wait`]: CanonicalABI.md#-canon-waitable-setwait +[`canon_waitable_set_poll`]: CanonicalABI.md#-canon-waitable-setpoll +[`canon_waitable_set_drop`]: CanonicalABI.md#-canon-waitable-setdrop +[`canon_waitable_join`]: CanonicalABI.md#-canon-waitablejoin [`canon_stream_new`]: CanonicalABI.md#-canon-streamfuturenew [`canon_stream_read`]: CanonicalABI.md#-canon-streamfuturereadwrite [`canon_future_read`]: CanonicalABI.md#-canon-streamfuturereadwrite @@ -2728,12 +2806,15 @@ For some use-case-focused, worked examples, see: [Task]: Async.md#task [Current Task]: Async.md#current-task +[Current Execution Context]: Async.md#current-execution-context +[Context-Local Storage Array]: Async.md#current-execution-context [Subtask]: Async.md#subtask [Stream or Future]: Async.md#streams-and-futures [Readable or Writable End]: Async.md#streams-and-futures [Writable End]: Async.md#streams-and-futures [Waiting]: Async.md#waiting [Waitables]: Async.md#waiting +[Waitable Set]: Async.md#waiting [Backpressure]: Async.md#backpressure [Returning]: Async.md#returning diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index 7fe65ef5..460981f2 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -206,6 +206,7 @@ class CanonicalOptions: class ComponentInstance: resources: Table[ResourceHandle] waitables: Table[Waitable] + waitable_sets: Table[WaitableSet] error_contexts: Table[ErrorContext] num_tasks: int may_leave: bool @@ -217,6 +218,7 @@ class ComponentInstance: def __init__(self): self.resources = Table[ResourceHandle]() self.waitables = Table[Waitable]() + self.waitable_sets = Table[WaitableSet]() self.error_contexts = Table[ErrorContext]() self.num_tasks = 0 self.may_leave = True @@ -233,7 +235,7 @@ class Table(Generic[ElemT]): array: list[Optional[ElemT]] free: list[int] - MAX_LENGTH = 2**30 - 1 + MAX_LENGTH = 2**28 - 1 def __init__(self): self.array = [None] @@ -336,6 +338,22 @@ def write(self, vs): self.ptr += len(vs) * elem_size(self.t) self.progress += len(vs) +#### Context-Local storage + +class ContextLocalStorage: + LENGTH = 2 + array: list[int] + + def __init__(self): + self.array = [0] * ContextLocalStorage.LENGTH + + def set(self, i, v): + assert(types_match_values(['i32'], [v])) + self.array[i] = v + + def get(self, i): + return self.array[i] + #### Task State class Task: @@ -345,9 +363,9 @@ class Task: caller: Optional[Task] on_return: Optional[Callable] on_block: Callable[[Awaitable], Awaitable] - waitable_set: WaitableSet num_subtasks: int num_borrows: int + context: ContextLocalStorage def __init__(self, opts, inst, ft, caller, on_return, on_block): self.opts = opts @@ -356,9 +374,9 @@ def __init__(self, opts, inst, ft, caller, on_return, on_block): self.caller = caller self.on_return = on_return self.on_block = on_block - self.waitable_set = WaitableSet() self.num_subtasks = 0 self.num_borrows = 0 + self.context = ContextLocalStorage() current = asyncio.Lock() @@ -418,13 +436,6 @@ def maybe_start_pending_task(self): self.inst.starting_pending_task = True pending_future.set_result(None) - async def wait(self, sync) -> EventTuple: - return await self.wait_on(sync, self.waitable_set.wait()) - - async def poll(self, sync) -> Optional[EventTuple]: - await self.yield_(sync) - return self.waitable_set.poll() - async def yield_(self, sync): await self.wait_on(sync, asyncio.sleep(0)) @@ -471,7 +482,6 @@ def return_(self, flat_results): def exit(self): assert(Task.current.locked()) trap_if(self.num_subtasks > 0) - self.waitable_set.drop() trap_if(self.on_return) assert(self.num_borrows == 0) trap_if(self.inst.num_tasks == 1 and self.inst.backpressure) @@ -485,19 +495,19 @@ def exit(self): #### Waitable State class CallState(IntEnum): - STARTING = 0 - STARTED = 1 - RETURNED = 2 + STARTING = 1 + STARTED = 2 + RETURNED = 3 class EventCode(IntEnum): + NONE = 0 CALL_STARTING = CallState.STARTING CALL_STARTED = CallState.STARTED CALL_RETURNED = CallState.RETURNED - YIELDED = 3 - STREAM_READ = 4 - STREAM_WRITE = 5 - FUTURE_READ = 6 - FUTURE_WRITE = 7 + STREAM_READ = 5 + STREAM_WRITE = 6 + FUTURE_READ = 7 + FUTURE_WRITE = 8 EventTuple = tuple[EventCode, int, int] @@ -547,15 +557,19 @@ def drop(self): class WaitableSet: elems: list[Waitable] maybe_has_pending_event: asyncio.Event + num_waiting: int def __init__(self): self.elems = [] self.maybe_has_pending_event = asyncio.Event() + self.num_waiting = 0 async def wait(self) -> EventTuple: + self.num_waiting += 1 while True: await self.maybe_has_pending_event.wait() if (e := self.poll()): + self.num_waiting -= 1 return e def poll(self) -> Optional[EventTuple]: @@ -570,6 +584,7 @@ def poll(self) -> Optional[EventTuple]: def drop(self): trap_if(len(self.elems) > 0) + trap_if(self.num_waiting > 0) #### Subtask State @@ -590,7 +605,6 @@ def add_to_waitables(self, task): self.supertask = task self.supertask.num_subtasks += 1 Waitable.__init__(self) - Waitable.join(self, task.waitable_set) return task.inst.waitables.add(self) def add_lender(self, lending_handle): @@ -1722,19 +1736,44 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo [] = await call_and_trap_on_throw(callee, task, flat_args) assert(types_match_values(flat_ft.results, [])) else: - [packed_ctx] = await call_and_trap_on_throw(callee, task, flat_args) - assert(types_match_values(flat_ft.results, [packed_ctx])) - while packed_ctx != 0: - is_yield = bool(packed_ctx & 1) - ctx = packed_ctx & ~1 - if is_yield: - await task.yield_(sync = False) - event, p1, p2 = (EventCode.YIELDED, 0, 0) + [packed] = await call_and_trap_on_throw(callee, task, flat_args) + while True: + code,si = unpack_callback_result(packed) + match code: + case CallbackCode.EXIT: + break + case CallbackCode.WAIT: + s = task.inst.waitable_sets.get(si) + e = await task.wait_on(opts.sync, s.wait()) + case CallbackCode.POLL: + s = task.inst.waitable_sets.get(si) + await task.yield_(opts.sync) + e = s.poll() + case CallbackCode.YIELD: + await task.yield_(opts.sync) + e = None + if e: + event, p1, p2 = e else: - event, p1, p2 = await task.wait(sync = False) - [packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2]) + event, p1, p2 = (EventCode.NONE, 0, 0) + [packed] = await call_and_trap_on_throw(opts.callback, task, [event, p1, p2]) task.exit() +class CallbackCode(IntEnum): + EXIT = 0 + WAIT = 1 + POLL = 2 + YIELD = 3 + MAX = 3 + +def unpack_callback_result(packed): + code = packed & 0xf + trap_if(code > CallbackCode.MAX) + assert(packed < 2**32) + assert(Table.MAX_LENGTH < 2**28) + waitable_set_index = packed >> 4 + return (CallbackCode(code), waitable_set_index) + async def call_and_trap_on_throw(callee, task, args): try: return await callee(task, args) @@ -1788,9 +1827,9 @@ def subtask_event(): subtask.finish() return (EventCode(subtask.state), subtaski, 0) subtask.set_event(subtask_event) - assert(0 < subtaski <= Table.MAX_LENGTH < 2**30) - assert(0 <= int(subtask.state) < 2**2) - flat_results = [subtaski | (int(subtask.state) << 30)] + assert(0 < subtaski <= Table.MAX_LENGTH < 2**28) + assert(0 <= int(subtask.state) < 2**4) + flat_results = [int(subtask.state) | (subtaski << 4)] return flat_results @@ -1836,9 +1875,24 @@ async def canon_resource_rep(rt, task, i): trap_if(h.rt is not rt) return [h.rep] -### ๐Ÿ”€ `canon task.backpressure` +### ๐Ÿ”€ `canon context.get` + +async def canon_context_get(t, i, task): + assert(t == 'i32') + assert(i < ContextLocalStorage.LENGTH) + return [task.context.get(i)] -async def canon_task_backpressure(task, flat_args): +### ๐Ÿ”€ `canon context.set` + +async def canon_context_set(t, i, task, v): + assert(t == 'i32') + assert(i < ContextLocalStorage.LENGTH) + task.context.set(i, v) + return [] + +### ๐Ÿ”€ `canon backpressure.set` + +async def canon_backpressure_set(task, flat_args): trap_if(task.opts.sync) task.inst.backpressure = bool(flat_args[0]) return [] @@ -1853,35 +1907,65 @@ async def canon_task_return(task, result_type, opts, flat_args): task.return_(flat_args) return [] -### ๐Ÿ”€ `canon task.wait` +### ๐Ÿ”€ `canon yield` -async def canon_task_wait(sync, mem, task, ptr): +async def canon_yield(sync, task): trap_if(not task.inst.may_leave) trap_if(task.opts.callback and not sync) - event, p1, p2 = await task.wait(sync) + await task.yield_(sync) + return [] + +### ๐Ÿ”€ `canon waitable-set.new` + +async def canon_waitable_set_new(task): + trap_if(not task.inst.may_leave) + return [ task.inst.waitable_sets.add(WaitableSet()) ] + +### ๐Ÿ”€ `canon waitable-set.wait` + +async def canon_waitable_set_wait(sync, mem, task, si, ptr): + trap_if(not task.inst.may_leave) + trap_if(task.opts.callback and not sync) + s = task.inst.waitable_sets.get(si) + e = await task.wait_on(sync, s.wait()) + return unpack_event(mem, task, ptr, e) + +def unpack_event(mem, task, ptr, e: EventTuple): + event, p1, p2 = e cx = LiftLowerContext(CanonicalOptions(memory = mem), task.inst) store(cx, p1, U32Type(), ptr) store(cx, p2, U32Type(), ptr + 4) return [event] -### ๐Ÿ”€ `canon task.poll` +### ๐Ÿ”€ `canon waitable-set.poll` -async def canon_task_poll(sync, mem, task, ptr): +async def canon_waitable_set_poll(sync, mem, task, si, ptr): trap_if(not task.inst.may_leave) trap_if(task.opts.callback and not sync) - ret = await task.poll(sync) - if ret is None: - return [0] - cx = LiftLowerContext(CanonicalOptions(memory = mem), task.inst) - store(cx, ret, TupleType([U32Type(), U32Type(), U32Type()]), ptr) - return [1] + s = task.inst.waitable_sets.get(si) + await task.yield_(sync) + if (e := s.poll()): + return unpack_event(mem, task, ptr, e) + return [EventCode.NONE] -### ๐Ÿ”€ `canon task.yield` +### ๐Ÿ”€ `canon waitable-set.drop` -async def canon_task_yield(sync, task): +async def canon_waitable_set_drop(task, i): trap_if(not task.inst.may_leave) - trap_if(task.opts.callback and not sync) - await task.yield_(sync) + s = task.inst.waitable_sets.remove(i) + s.drop() + return [] + +### ๐Ÿ”€ `canon waitable.join` + +async def canon_waitable_join(task, wi, si): + trap_if(not task.inst.may_leave) + w = task.inst.waitables.get(wi) + if si == 0: + w.join(None) + else: + s = task.inst.waitable_sets.get(si) + w.join(s) return [] ### ๐Ÿ”€ `canon subtask.drop` @@ -1948,7 +2032,6 @@ def on_partial_copy(revoke_buffer): def copy_event(revoke_buffer): revoke_buffer() e.copying = False - e.join(None) return (event_code, i, pack_copy_result(task, buffer, e)) def on_partial_copy(revoke_buffer): e.set_event(partial(copy_event, revoke_buffer)) @@ -1956,7 +2039,6 @@ def on_copy_done(): e.set_event(partial(copy_event, revoke_buffer = lambda:())) if e.copy(buffer, on_partial_copy, on_copy_done) != 'done': e.copying = True - e.join(task.waitable_set) return [BLOCKED] return [pack_copy_result(task, buffer, e)] diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index babd7524..f0579a46 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -61,7 +61,7 @@ def mk_tup_rec(x): return { str(i):mk_tup_rec(v) for i,v in enumerate(a) } def unpack_lower_result(ret): - return (ret & ~(3 << 30), ret >> 30) + return (ret & 0xf, ret >> 4) def fail(msg): raise BaseException(msg) @@ -535,9 +535,9 @@ async def core_eager_producer(task, args): fut1 = asyncio.Future() async def core_toggle(task, args): assert(len(args) == 0) - [] = await canon_task_backpressure(task, [1]) + [] = await canon_backpressure_set(task, [1]) await task.on_block(fut1) - [] = await canon_task_backpressure(task, [0]) + [] = await canon_backpressure_set(task, [0]) [] = await canon_task_return(task, [], producer_opts, []) return [] toggle_callee = partial(canon_lift, producer_opts, producer_inst, toggle_ft, core_toggle) @@ -554,40 +554,50 @@ async def core_blocking_producer(task, args): return [] blocking_callee = partial(canon_lift, producer_opts, producer_inst, blocking_ft, core_blocking_producer) - consumer_heap = Heap(10) + consumer_heap = Heap(20) consumer_opts = mk_opts(consumer_heap.memory) consumer_opts.sync = False async def consumer(task, args): [b] = args + [seti] = await canon_waitable_set_new(task) ptr = consumer_heap.realloc(0, 0, 1, 1) [ret] = await canon_lower(consumer_opts, eager_ft, eager_callee, task, [ptr]) assert(ret == 0) u8 = consumer_heap.memory[ptr] assert(u8 == 43) [ret] = await canon_lower(consumer_opts, toggle_ft, toggle_callee, task, []) - subi,state = unpack_lower_result(ret) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) assert(state == CallState.STARTED) + [] = await canon_waitable_join(task, subi1, seti) retp = ptr consumer_heap.memory[retp] = 13 [ret] = await canon_lower(consumer_opts, blocking_ft, blocking_callee, task, [83, retp]) - assert(ret == (2 | (CallState.STARTING << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTING) assert(consumer_heap.memory[retp] == 13) + [] = await canon_waitable_join(task, subi2, seti) fut1.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + waitretp = consumer_heap.realloc(0, 0, 8, 4) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, waitretp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) - [] = await canon_subtask_drop(task, callidx) - event, callidx, _ = await task.wait(sync = True) + assert(consumer_heap.memory[waitretp] == subi1) + [] = await canon_subtask_drop(task, subi1) + + [event] = await canon_waitable_set_wait(True, consumer_heap.memory, task, seti, waitretp) assert(event == EventCode.CALL_STARTED) - assert(callidx == 2) + assert(consumer_heap.memory[waitretp] == subi2) assert(consumer_heap.memory[retp] == 13) fut2.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, waitretp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) + assert(consumer_heap.memory[waitretp] == subi2) assert(consumer_heap.memory[retp] == 44) - [] = await canon_subtask_drop(task, callidx) + [] = await canon_subtask_drop(task, subi2) fut3.set_result(None) assert(await task.on_block(fut4) == "done") @@ -605,13 +615,19 @@ async def dtor(task, args): assert(i == 1) assert(dtor_value is None) [ret] = await canon_resource_drop(rt, False, task, 1) - assert(ret == (2 | (CallState.STARTED << 30))) + state,dtorsubi = unpack_lower_result(ret) + assert(dtorsubi == 2) + assert(state == CallState.STARTED) assert(dtor_value is None) dtor_fut.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + [] = await canon_waitable_join(task, dtorsubi, seti) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, waitretp) assert(event == CallState.RETURNED) - assert(callidx == 2) - [] = await canon_subtask_drop(task, callidx) + assert(consumer_heap.memory[waitretp] == dtorsubi) + assert(dtor_value == 50) + [] = await canon_subtask_drop(task, dtorsubi) + [] = await canon_waitable_set_drop(task, seti) [] = await canon_task_return(task, [U8Type()], consumer_opts, [42]) return [] @@ -655,36 +671,52 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(opts, producer_ft, producer2, task, []) - assert(ret == (2 | (CallState.STARTED << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTED) + + [seti] = await canon_waitable_set_new(task) + assert(seti == 1) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut1.set_result(None) - return [42] + [] = await canon_context_set('i32', 0, task, 42) + return [definitions.CallbackCode.WAIT|(seti << 4)] async def callback(task, args): - assert(len(args) == 4) - if args[0] == 42: - assert(args[1] == EventCode.CALL_RETURNED) - assert(args[2] == 1) - assert(args[3] == 0) - await canon_subtask_drop(task, 1) - return [53] - elif args[0] == 52: - assert(args[1] == EventCode.YIELDED) - assert(args[2] == 0) - assert(args[3] == 0) - fut2.set_result(None) - return [62] - else: - assert(args[0] == 62) - assert(args[1] == EventCode.CALL_RETURNED) - assert(args[2] == 2) - assert(args[3] == 0) - await canon_subtask_drop(task, 2) - [] = await canon_task_return(task, [U32Type()], opts, [83]) - return [0] + assert(len(args) == 3) + seti = 1 + [ctx] = await canon_context_get('i32', 0, task) + match ctx: + case 42: + assert(args[0] == EventCode.CALL_RETURNED) + assert(args[1] == 1) + assert(args[2] == 0) + await canon_subtask_drop(task, 1) + [] = await canon_context_set('i32', 0, task, 52) + return [definitions.CallbackCode.YIELD] + case 52: + assert(args[0] == EventCode.NONE) + assert(args[1] == 0) + assert(args[2] == 0) + fut2.set_result(None) + [] = await canon_context_set('i32', 0, task, 62) + return [definitions.CallbackCode.WAIT|(seti<<4)] + case 62: + assert(args[0] == EventCode.CALL_RETURNED) + assert(args[1] == 2) + assert(args[2] == 0) + await canon_subtask_drop(task, 2) + [] = await canon_task_return(task, [U32Type()], opts, [83]) + return [definitions.CallbackCode.EXIT] + case _: + assert(False) consumer_inst = ComponentInstance() def on_start(): return [] @@ -727,7 +759,8 @@ async def producer2_core(task, args): producer1 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer1_core) producer2 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer2_core) - consumer_opts = mk_opts() + consumer_heap = Heap(20) + consumer_opts = mk_opts(consumer_heap.memory) consumer_opts.sync = False consumer_ft = FuncType([],[U8Type()]) @@ -735,31 +768,40 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) - assert(ret == (2 | (CallState.STARTING << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTING) - assert(await task.poll(sync = False) is None) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut.set_result(None) assert(producer1_done == False) - event, callidx, _ = await task.wait(sync = False) + + retp = consumer_heap.realloc(0,0,8,4) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) - await canon_subtask_drop(task, callidx) + assert(consumer_heap.memory[retp] == subi1) + await canon_subtask_drop(task, subi1) assert(producer1_done == True) assert(producer2_done == False) - await canon_task_yield(False, task) + await canon_yield(False, task) assert(producer2_done == True) - event, callidx, _ = await task.poll(sync = False) + + [event] = await canon_waitable_set_poll(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) - await canon_subtask_drop(task, callidx) + assert(consumer_heap.memory[retp] == subi2) + await canon_subtask_drop(task, subi2) assert(producer2_done == True) - assert(await task.poll(sync = True) is None) + [] = await canon_waitable_set_drop(task, seti) await canon_task_return(task, [U8Type()], consumer_opts, [83]) return [] @@ -786,9 +828,9 @@ async def test_async_backpressure(): producer1_done = False async def producer1_core(task, args): nonlocal producer1_done - await canon_task_backpressure(task, [1]) + await canon_backpressure_set(task, [1]) await task.on_block(fut) - await canon_task_backpressure(task, [0]) + await canon_backpressure_set(task, [0]) await canon_task_return(task, [], producer_opts, []) producer1_done = True return [] @@ -804,37 +846,46 @@ async def producer2_core(task, args): producer1 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer1_core) producer2 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer2_core) - consumer_opts = CanonicalOptions() - consumer_opts.sync = False + consumer_heap = Heap(20) + consumer_opts = mk_opts(consumer_heap.memory, sync = False) consumer_ft = FuncType([],[U8Type()]) async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) - assert(ret == (2 | (CallState.STARTING << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTING) - assert(await task.poll(sync = False) is None) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut.set_result(None) assert(producer1_done == False) assert(producer2_done == False) - event, callidx, _ = await task.wait(sync = False) + + retp = consumer_heap.realloc(0,0,8,4) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) + assert(consumer_heap.memory[retp] == subi1) assert(producer1_done == True) - event, callidx, _ = await task.poll(sync = False) + + [event] = await canon_waitable_set_poll(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) + assert(consumer_heap.memory[retp] == subi2) assert(producer2_done == True) - await canon_subtask_drop(task, 1) - await canon_subtask_drop(task, 2) + await canon_subtask_drop(task, subi1) + await canon_subtask_drop(task, subi2) - assert(await task.poll(sync = False) is None) + [] = await canon_waitable_set_drop(task, seti) await canon_task_return(task, [U8Type()], consumer_opts, [84]) return [] @@ -868,26 +919,40 @@ async def core_hostcall_pre(fut, task, args): core_hostcall2 = partial(core_hostcall_pre, fut2) hostcall2 = partial(canon_lift, hostcall_opts, hostcall_inst, ft, core_hostcall2) - lower_opts = mk_opts() + lower_heap = Heap(20) + lower_opts = mk_opts(lower_heap.memory) lower_opts.sync = False async def core_func(task, args): [ret] = await canon_lower(lower_opts, ft, hostcall1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(lower_opts, ft, hostcall2, task, []) - assert(ret == (2 | (CallState.STARTED << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTED) + + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut1.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + retp = lower_heap.realloc(0,0,8,4) + [event] = await canon_waitable_set_wait(False, lower_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) + assert(lower_heap.memory[retp] == subi1) + fut2.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + [event] = await canon_waitable_set_wait(False, lower_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) + assert(lower_heap.memory[retp] == subi2) - await canon_subtask_drop(task, 1) - await canon_subtask_drop(task, 2) + await canon_subtask_drop(task, subi1) + await canon_subtask_drop(task, subi2) + await canon_waitable_set_drop(task, seti) return [] @@ -1118,7 +1183,7 @@ async def core_func(task, args): async def test_async_stream_ops(): ft = FuncType([StreamType(U8Type())], [StreamType(U8Type())]) inst = ComponentInstance() - mem = bytearray(20) + mem = bytearray(24) opts = mk_opts(memory=mem, sync=False) sync_opts = mk_opts(memory=mem, sync=True) @@ -1159,13 +1224,15 @@ async def core_func(task, args): [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) assert(ret == definitions.BLOCKED) src_stream.write([1,2,3,4]) - event, p1, p2 = await task.wait(sync = False) + retp = 16 + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi1, seti) + [event] = await canon_waitable_set_wait(False, mem, task, rsi1, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi1) - assert(p2 == 4) + assert(mem[retp+0] == rsi1) + assert(mem[retp+4] == 4) assert(mem[0:4] == b'\x01\x02\x03\x04') [wsi2] = await canon_stream_new(U8Type(), task) - retp = 16 [ret] = await canon_lower(opts, ft, host_import, task, [wsi2, retp]) assert(ret == 0) rsi2 = mem[16] @@ -1173,19 +1240,21 @@ async def core_func(task, args): [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) assert(ret == definitions.BLOCKED) host_import_incoming.set_remain(100) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, wsi2, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi2) - assert(p2 == 4) + assert(mem[retp+0] == wsi2) + assert(mem[retp+4] == 4) [ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi2, 0, 4) assert(ret == 4) [ret] = await canon_stream_write(U8Type(), opts, task, wsi1, 0, 4) assert(ret == definitions.BLOCKED) dst_stream.set_remain(100) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, wsi1, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi1) - assert(p2 == 4) + assert(mem[retp+0] == wsi1) + assert(mem[retp+4] == 4) src_stream.write([5,6,7,8]) src_stream.destroy_once_empty() [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) @@ -1199,16 +1268,18 @@ async def core_func(task, args): [] = await canon_stream_close_writable(U8Type(), task, wsi2, 0) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.BLOCKED) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, rsi2, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi2) - assert(p2 == 4) + assert(mem[retp+0] == rsi2) + assert(mem[retp+4] == 4) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.CLOSED) [] = await canon_stream_close_readable(U8Type(), task, rsi2) [ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi1, 0, 4) assert(ret == 4) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) + [] = await canon_waitable_set_drop(task, seti) return [] await canon_lift(opts, inst, ft, core_func, None, on_start, on_return, Task.sync_on_block) @@ -1311,10 +1382,13 @@ async def core_func(task, args): [ret] = await canon_stream_read(U8Type(), opts, task, rsi, 0, 4) assert(ret == definitions.BLOCKED) src.write([5,6]) - event, p1, p2 = await task.wait(sync = False) + + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 2) + assert(mem[retp+0] == rsi) + assert(mem[retp+4] == 2) [] = await canon_stream_close_readable(U8Type(), task, rsi) [wsi] = await canon_stream_new(U8Type(), task) @@ -1327,12 +1401,14 @@ async def core_func(task, args): [ret] = await canon_stream_write(U8Type(), opts, task, wsi, 2, 6) assert(ret == definitions.BLOCKED) dst.set_remain(4) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, wsi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi) - assert(p2 == 4) + assert(mem[retp+0] == wsi) + assert(mem[retp+4] == 4) assert(dst.received == [1,2,3,4,5,6]) [] = await canon_stream_close_writable(U8Type(), task, wsi, 0) + [] = await canon_waitable_set_drop(task, seti) dst.set_remain(100) assert(await dst.consume(100) is None) return [] @@ -1349,7 +1425,7 @@ async def test_wasm_to_wasm_stream(): fut1, fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future(), asyncio.Future() inst1 = ComponentInstance() - mem1 = bytearray(10) + mem1 = bytearray(24) opts1 = mk_opts(memory=mem1, sync=False) ft1 = FuncType([], [StreamType(U8Type())]) async def core_func1(task, args): @@ -1373,22 +1449,26 @@ async def core_func1(task, args): fut3.set_result(None) - event, p1, p2 = await task.wait(sync = False) + retp = 16 + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, wsi, seti) + [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi) - assert(p2 == 4) + assert(mem1[retp+0] == wsi) + assert(mem1[retp+4] == 4) fut4.set_result(None) [errctxi] = await canon_error_context_new(opts1, task, 0, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi) + [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_drop(task, errctxi) return [] func1 = partial(canon_lift, opts1, inst1, ft1, core_func1) inst2 = ComponentInstance() - heap2 = Heap(10) + heap2 = Heap(24) mem2 = heap2.memory opts2 = mk_opts(memory=heap2.memory, realloc=heap2.realloc, sync=False) ft2 = FuncType([], []) @@ -1396,10 +1476,10 @@ async def core_func2(task, args): assert(not args) [] = await canon_task_return(task, [], opts2, []) - retp = 0 + retp = 16 [ret] = await canon_lower(opts2, ft1, func1, task, [retp]) assert(ret == 0) - rsi = mem2[0] + rsi = mem2[retp] assert(rsi == 1) [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 8) @@ -1407,10 +1487,12 @@ async def core_func2(task, args): fut1.set_result(None) - event, p1, p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 4) + assert(mem2[retp+0] == rsi) + assert(mem2[retp+4] == 4) assert(mem2[0:8] == b'\x01\x02\x03\x04\x00\x00\x00\x00') fut2.set_result(None) @@ -1430,6 +1512,7 @@ async def core_func2(task, args): errctxi = 1 assert(ret == (definitions.CLOSED | errctxi)) [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) return [] @@ -1439,7 +1522,7 @@ async def core_func2(task, args): async def test_cancel_copy(): inst = ComponentInstance() - mem = bytearray(10) + mem = bytearray(24) lower_opts = mk_opts(memory=mem, sync=False) host_ft1 = FuncType([StreamType(U8Type())],[]) @@ -1492,7 +1575,7 @@ async def core_func(task, args): host_sink.set_remain(100) assert(await host_sink.consume(100) is None) - retp = 0 + retp = 16 [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == 0) rsi = mem[retp] @@ -1502,7 +1585,6 @@ async def core_func(task, args): assert(ret == 0) [] = await canon_stream_close_readable(U8Type(), task, rsi) - retp = 0 [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == 0) rsi = mem[retp] @@ -1513,12 +1595,15 @@ async def core_func(task, args): assert(ret == definitions.BLOCKED) host_source.write([7,8]) await asyncio.sleep(0) - event,p1,p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 2) + assert(mem[retp+0] == rsi) + assert(mem[retp+4] == 2) assert(mem[0:2] == b'\x07\x08') [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_waitable_set_drop(task, seti) return [] @@ -1589,7 +1674,7 @@ def close(self, errctx = None): async def test_futures(): inst = ComponentInstance() - mem = bytearray(10) + mem = bytearray(24) lower_opts = mk_opts(memory=mem, sync=False) host_ft1 = FuncType([FutureType(U8Type())],[FutureType(U8Type())]) @@ -1607,7 +1692,7 @@ async def host_func(task, on_start, on_return, on_block): async def core_func(task, args): assert(not args) [wfi] = await canon_future_new(U8Type(), task) - retp = 0 + retp = 16 [ret] = await canon_lower(lower_opts, host_ft1, host_func, task, [wfi, retp]) assert(ret == 0) rfi = mem[retp] @@ -1621,17 +1706,19 @@ async def core_func(task, args): [ret] = await canon_future_write(U8Type(), lower_opts, task, wfi, writep) assert(ret == 1) - event,p1,p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rfi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.FUTURE_READ) - assert(p1 == rfi) - assert(p2 == 1) + assert(mem[retp+0] == rfi) + assert(mem[retp+4] == 1) assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) [] = await canon_future_close_readable(U8Type(), task, rfi) + [] = await canon_waitable_set_drop(task, seti) [wfi] = await canon_future_new(U8Type(), task) - retp = 0 [ret] = await canon_lower(lower_opts, host_ft1, host_func, task, [wfi, retp]) assert(ret == 0) rfi = mem[retp]