Skip to content

Commit

Permalink
Change async CABI, add context.{get,set} and waitable sets
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 21, 2025
1 parent 9f0f1d2 commit f10560f
Show file tree
Hide file tree
Showing 6 changed files with 969 additions and 415 deletions.
213 changes: 154 additions & 59 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ summary of the motivation and animated sketch of the design in action.
* [Sync and Async Functions](#sync-and-async-functions)
* [Task](#task)
* [Current task](#current-task)
* [Context-Local Storage](#context-local-storage)
* [Subtask and Supertask](#subtask-and-supertask)
* [Structured concurrency](#structured-concurrency)
* [Streams and Futures](#streams-and-futures)
Expand Down Expand Up @@ -181,6 +182,38 @@ although there can be multiple live `Task` objects in a component instance,
"the current one" is always clear: it's the one passed to the current function
as a parameter.

### Context-Local Storage

Each task contains a distinct mutable **context-local storage** array. The
current task's context-local storage can be read and written from core wasm
code by calling the [`context.get`] and [`context.set`] built-ins.

The context-local storage array's length is currently fixed to contain exactly
2 `i32`s with the goal of allowing this array to be stored inline in whatever
existing runtime data structure is already efficiently reachable from ambient
compiled wasm code. Because module instantiation is declarative in the
Component Model, the imported `context.{get,set}` built-ins can be inlined by
the core wasm compiler as-if they were instructions, allowing the generated
machine code to be a single load or store. This makes context-local storage a
good place to store the linear-memory shadow stack pointer as well as the
pointer to the struct used to implement [thread-local storage] APIs used by
guest code.

When [memory64] is integrated into the Component Model's Canonical ABI,
`context.{get,set}` will be backwards-compatibly relaxed to allow `i64`
pointers (overlaying the `i32` values like hardware 32/64-bit registers). When
[wasm-gc] is integrated, these integral context values can serve as indices
into guest-managed tables of typed GC references.

When [threads are added](#interaction-with-multi-threading), each thread will
also get its own distinct mutable context-local storage array. This is the
reason why "context-local" storage is not called "task-local" storage (where a
"context" is a finer-grained unit of execution than either a "task" or a
"thread").

For details, see [`context.get`] in the AST explainer and [`canon_context_get`]
in the Canonical ABI explainer.

### Subtask and Supertask

Each component-to-component call necessarily creates a new task in the callee.
Expand Down Expand Up @@ -322,32 +355,52 @@ maintained for streams and futures by the Canonical ABI.
When a component asynchronously lowers an import, it is explicitly requesting
that, if the import blocks, control flow be returned back to the calling task
so that it can do something else. Similarly, if `stream.read` or `stream.write`
would block, they return a "blocked" code so that the caller can continue to
make progress on other things. But eventually, a task will run out of other
things to do and will need to **wait** for progress on one of the task's
subtasks, readable stream ends, writable stream ends, readable future ends or
writable future ends, which are collectively called its **waitables**. While a
task is waiting on its waitables, the Component Model runtime can switch to
other running tasks or start new tasks by invoking exports.

The Canonical ABI provides two ways for a task to wait:
* The task can call the [`task.wait`] built-in to synchronously wait for
progress. This is specified in the Canonical ABI by the [`canon_task_wait`]
function.
* The task can specify a `callback` function (in the `canon lift` definition)
and return to the event loop to wait for notification of progress by a call
to the `callback` function. This is specified in the Canonical ABI by
the `opts.callback` case in [`canon_lift`].
are called asynchronously and would block, they return a "blocked" code so that
the caller can continue to make progress on other things. But eventually, a
task will run out of other things to do and will need to **wait** for progress
on one of the task's subtasks, reads or writes, which are collectively called
its **waitables**. The Canonical ABI Python represents waitables with the
[`Waitable`] base class. While a task is waiting, the Component Model runtime
can switch to other running tasks or start new tasks by invoking exports.

To avoid the O(N) cost of processing an N-ary list of waitables every time a
task needs to wait (which is the classic performance bottleneck of, e.g., POSIX
`select()`), the Canonical ABI allows waitables to be maintained in **waitable
sets** which (like `epoll()`) can be waited upon as a whole for any one of the
member waitables to make progress. Waitable sets are independent of tasks;
tasks can wait on different waitable sets over time and a single waitable set
can be waited upon by multiple tasks at once. Waitable sets are local to a
component instance and cannot be shared across component boundaries.

The Canonical ABI provides two ways for a task to wait on a waitable set:
* Core wasm can pass (the index of) the waitable set as a parameter to the
[`waitable-set.wait`] built-in which blocks and returns the event that
occurred.
* If the task uses a `callback` function, core wasm can return (the index of)
the waitable set as a return value to the event loop, which will block and
then pass the event that occurred as a parameter to the `callback`.

While the two approaches have significant runtime implementation differences
(the former requires [fibers] or a [CPS transform] while the latter only
requires storing a small `i32` "context" in the task), semantically they do the
same thing which, in the Canonical ABI Python code, is factored out into
[`Task`]'s `wait` method. Thus, the difference between `callback` and
non-`callback` is mostly one of optimization, not expressivity.

The Canonical ABI Python represents waitables with a common [`Waitable`]
base class.
requires storing fixed-size context-local storage and [`Task`] state),
semantically they do the same thing which, in the Canonical ABI Python code, is
factored out into the [`Task.wait`] method. Thus, the difference between
`callback` and non-`callback` is one of optimization, not expressivity.

In addition to waiting for an event to occur, a task can also **poll** for
whether an event has already occurred. Polling does not block, but does allow
other tasks to be switched to and executed. Polling is opportunistic, allowing
the servicing of higher-priority events in the middle of longer-running
computations; when there is nothing left to do, a task must *wait*. A task
can poll by either calling [`waitable-set.poll`] or, when using a
`callback`, by returning the Canonical-ABI-defined "poll" code to the event loop
along with (the index of) the waitable set to poll.

Lastly, if a long-running task wants to allow other tasks to execute, without
having any of its own subtasks to wait on, it can **yield**, allowing other
tasks to be scheduled before continuing execution of the current task. A task
can yield by either calling [`yield`] or, when using a `callback`, by returning
the Canonical-ABI-defined "yield" code to the event loop.

### Backpressure

Expand All @@ -356,16 +409,16 @@ export calls can start piling up, each consuming some of the component's finite
private resources (like linear memory), requiring the component to be able to
exert *backpressure* to allow some tasks to finish (and release private
resources) before admitting new async export calls. To do this, a component may
call the `task.backpressure` built-in to set a "backpressure" flag that causes
subsequent export calls to immediately return in the "starting" state without
calling the component's Core WebAssembly code.
call the [`backpressure.set`] built-in to set a component-instance-wide
"backpressure" flag that causes subsequent export calls to immediately return
in the "starting" state without calling the component's Core WebAssembly code.

Once task enables backpressure, it can [wait](#waiting) for existing tasks to
finish and release their associated resources. Thus, a task can choose to
[wait](#waiting) with or without backpressure enabled, depending on whether it
wants to accept new accept new export calls while waiting or not.

See the [`canon_task_backpressure`] function and [`Task.enter`] method in the
See the [`canon_backpressure_set`] function and [`Task.enter`] method in the
Canonical ABI explainer for the setting and implementation of backpressure.

Once a task is allowed to start according to these backpressure rules, its
Expand Down Expand Up @@ -415,18 +468,29 @@ replaced with `...` to focus on the overall flow of function calls.
(import "libc" "mem" (memory 1))
(import "libc" "realloc" (func (param i32 i32 i32 i32) (result i32)))
(import "" "fetch" (func $fetch (param i32 i32) (result i32)))
(import "" "waitable-set.new" (func $new_waitable_set (result i32)))
(import "" "waitable-set.wait" (func $wait (param i32 i32) (result i32)))
(import "" "waitable.join" (func $join (param i32 i32)))
(import "" "task.return" (func $task_return (param i32 i32)))
(import "" "task.wait" (func $wait (param i32) (result i32)))
(global $wsi (mut i32))
(func $start
(global.set $wsi (call $new_waitable_set))
)
(start $start)
(func (export "summarize") (param i32 i32)
...
loop
...
call $fetch ;; pass a pointer-to-string and pointer-to-list-of-bytes outparam
... ;; ... and receive the index of a new async subtask
global.get $wsi
call $join ;; ... and add it to the waitable set
...
end
loop ;; loop as long as there are any subtasks
...
call $task_wait ;; wait for a subtask to make progress
global.get $wsi
call $wait ;; wait for a subtask in the waitable set to make progress
...
end
...
Expand All @@ -438,14 +502,18 @@ replaced with `...` to focus on the overall flow of function calls.
(alias $libc "mem" (core memory $mem))
(alias $libc "realloc" (core func $realloc))
(canon lower $fetch async (memory $mem) (realloc $realloc) (core func $fetch'))
(canon waitable-set.new (core func $new))
(canon waitable-set.wait async (memory $mem) (core func $wait))
(canon waitable.join (core func $join))
(canon task.return (result string) async (memory $mem) (realloc $realloc) (core func $task_return))
(canon task.wait async (memory $mem) (core func $task_wait))
(core instance $main (instantiate $Main (with "" (instance
(export "mem" (memory $mem))
(export "realloc" (func $realloc))
(export "fetch" (func $fetch'))
(export "waitable-set.new" (func $new))
(export "waitable-set.wait" (func $wait))
(export "waitable.join" (func $join))
(export "task.return" (func $task_return))
(export "task.wait" (func $task_wait))
))))
(canon lift (core func $main "summarize")
async (memory $mem) (realloc $realloc)
Expand All @@ -456,25 +524,21 @@ replaced with `...` to focus on the overall flow of function calls.
Because the imported `fetch` function is `canon lower`ed with `async`, its core
function type (shown in the first import of `$Main`) takes pointers to the
parameter and results (which are asynchronously read-from and written-to) and
returns the index of a new subtask. `summarize` calls `task.wait` repeatedly
until all `fetch` subtasks have finished, noting that `task.wait` can return
intermediate progress (as subtasks transition from "starting" to "started" to
"returned") which tell the surrounding core wasm code that it can reclaim the
memory passed arguments or use the results that have now been written to the
outparam memory.
returns the index of a new subtask. `summarize` calls `waitable-set.wait`
repeatedly until all `fetch` subtasks have finished, noting that
`waitable-set.wait` can return intermediate progress (as subtasks transition
from "starting" to "started" to "returned") which tell the surrounding core
wasm code that it can reclaim the memory passed arguments or use the results
that have now been written to the outparam memory.

Because the `summarize` function is `canon lift`ed with `async`, its core
function type has no results, since results are passed out via `task.return`.
It also means that multiple `summarize` calls can be active at once: once the
first call to `task.wait` blocks, the runtime will suspend its callstack
function type has no results; results are passed out via `task.return`. It also
means that multiple `summarize` calls can be active at once: once the first
call to `waitable-set.wait` blocks, the runtime will suspend its callstack
(fiber) and start a new stack for the new call to `summarize`. Thus,
`summarize` must be careful to allocate a separate linear-memory stack in its
entry point, if one is needed, and to save and restore this before and after
calling `task.wait`.

(Note that, for brevity this example ignores the `memory` and `realloc`
immediates required by `canon lift` and `canon lower` to allocate the `list`
param and `string` result, resp.)
entry point and store it in context-local storage (via `context.set`) instead
of simply using a `global`, as in a synchronous function.

This same example can be re-written to use the `callback` immediate (thereby
avoiding the need for fibers) as follows. Note that the internal structure of
Expand All @@ -495,37 +559,55 @@ not externally-visible behavior.
(import "libc" "mem" (memory 1))
(import "libc" "realloc" (func (param i32 i32 i32 i32) (result i32)))
(import "" "fetch" (func $fetch (param i32 i32) (result i32)))
(import "" "waitable-set.new" (func $new_waitable_set (result i32)))
(import "" "waitable.join" (func $join (param i32 i32)))
(import "" "task.return" (func $task_return (param i32 i32)))
(global $wsi (mut i32))
(func $start
(global.set $wsi (call $new_waitable_set))
)
(start $start)
(func (export "summarize") (param i32 i32) (result i32)
...
loop
...
call $fetch ;; pass a pointer-to-string and pointer-to-list-of-bytes outparam
... ;; ... and receive the index of a new async subtask
global.get $wsi
call $join ;; ... and add it to the waitable set
...
end
... ;; return a non-zero "cx" value passed to the next call to "cb"
(i32.or ;; return (WAIT | ($wsi << 4))
(i32.const 2) ;; 2 -> WAIT
(i32.shl
(global.get $wsi)
(i32.const 4)))
)
(func (export "cb") (param $cx i32) (param $event i32) (param $p1 i32) (param $p2 i32)
(func (export "cb") (param $event i32) (param $p1 i32) (param $p2 i32)
...
if ... subtasks remain ...
get_local $cx
return ;; wait for another subtask to make progress
if (result i32) ;; if subtasks remain:
i32.const 2 ;; return WAIT
else ;; if no subtasks remain:
...
call $task_return ;; return the string result (pointer,length)
...
i32.const 0 ;; return EXIT
end
...
call $task_return ;; return the string result (pointer,length)
...
i32.const 0 ;; return zero to signal that this task is done
)
)
(core instance $libc (instantiate $Libc))
(alias $libc "mem" (core memory $mem))
(alias $libc "realloc" (core func $realloc))
(canon lower $fetch async (memory $mem) (realloc $realloc) (core func $fetch'))
(canon waitable-set.new (core func $new))
(canon waitable.join (core func $join))
(canon task.return (result string) async (memory $mem) (realloc $realloc) (core func $task_return))
(core instance $main (instantiate $Main (with "" (instance
(export "mem" (memory $mem))
(export "realloc" (func $realloc))
(export "fetch" (func $fetch'))
(export "waitable-set.new" (func $new))
(export "waitable.join" (func $join))
(export "task.return" (func $task_return))
))))
(canon lift (core func $main "summarize")
Expand All @@ -534,6 +616,9 @@ not externally-visible behavior.
(export "summarize" (func $summarize))
)
```
For an explanation of the bitpacking of the `i32` callback return value,
see [`unpack_callback_result`] in the Canonical ABI explainer.

While this example spawns all the subtasks in the initial call to `summarize`,
subtasks can also be spawned from `cb` (even after the call to `task.return`).
It's also possible for `summarize` to call `task.return` called eagerly in the
Expand Down Expand Up @@ -623,25 +708,33 @@ comes after:
[Event Loop]: https://en.wikipedia.org/wiki/Event_loop
[Structured Concurrency]: https://en.wikipedia.org/wiki/Structured_concurrency
[Unit]: https://en.wikipedia.org/wiki/Unit_type
[Thread-local Storage]: https://en.wikipedia.org/wiki/Thread-local_storage

[AST Explainer]: Explainer.md
[Lift and Lower Definitions]: Explainer.md#canonical-definitions
[Lifted]: Explainer.md#canonical-definitions
[Canonical Built-in]: Explainer.md#canonical-built-ins
[`context.get`]: Explainer.md#-contextget
[`context.set`]: Explainer.md#-contextset
[`backpressure.set`]: Explainer.md#-backpressureset
[`task.return`]: Explainer.md#-taskreturn
[`task.wait`]: Explainer.md#-taskwait
[`yield`]: Explainer.md#-yield
[`waitable-set.wait`]: Explainer.md#-waitable-setwait
[`waitable-set.poll`]: Explainer.md#-waitable-setpoll
[`thread.spawn`]: Explainer.md#-threadspawn
[ESM-integration]: Explainer.md#ESM-integration

[Canonical ABI Explainer]: CanonicalABI.md
[`canon_lift`]: CanonicalABI.md#canon-lift
[`canon_lift`]: CanonicalABI.md#canon-lift
[`unpack_callback_result`]: CanonicalABI.md#canon-lift
[`canon_lower`]: CanonicalABI.md#canon-lower
[`canon_task_wait`]: CanonicalABI.md#-canon-taskwait
[`canon_task_backpressure`]: CanonicalABI.md#-canon-taskbackpressure
[`canon_context_get`]: CanonicalABI.md#-canon-contextget
[`canon_backpressure_set`]: CanonicalABI.md#-canon-backpressureset
[`canon_waitable_set_wait`]: CanonicalABI.md#-canon-waitable-setwait
[`canon_task_return`]: CanonicalABI.md#-canon-taskreturn
[`Task`]: CanonicalABI.md#task-state
[`Task.enter`]: CanonicalABI.md#task-state
[`Task.wait`]: CanonicalABI.md#task-state
[`Waitable`]: CanonicalABI.md#waitable-state
[`Subtask`]: CanonicalABI.md#subtask-state
[Stream State]: CanonicalABI.md#stream-state
Expand All @@ -657,6 +750,8 @@ comes after:
[stack-switching]: https://github.com/WebAssembly/stack-switching/
[JSPI]: https://github.com/WebAssembly/js-promise-integration/
[shared-everything-threads]: https://github.com/webAssembly/shared-everything-threads
[memory64]: https://github.com/webAssembly/memory64
[wasm-gc]: https://github.com/WebAssembly/gc/blob/main/proposals/gc/MVP.md

[WASI Preview 3]: https://github.com/WebAssembly/WASI/tree/main/wasip2#looking-forward-to-preview-3
[`wasi:http/handler.handle`]: https://github.com/WebAssembly/wasi-http/blob/main/wit-0.3.0-draft/handler.wit
Loading

0 comments on commit f10560f

Please sign in to comment.