Skip to content

Commit

Permalink
[WIP] add waitable-set
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 13, 2025
1 parent 5c6ccad commit 67f4526
Show file tree
Hide file tree
Showing 6 changed files with 682 additions and 306 deletions.
245 changes: 179 additions & 66 deletions design/mvp/Async.md

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,11 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x04 rt:<typeidx> => (canon resource.rep rt (core func))
| 0x05 ft:<typeidx> => (canon thread.spawn ft (core func)) 🧵
| 0x06 => (canon thread.available_parallelism (core func)) 🧵
| 0x08 => (canon task.backpressure (core func)) 🔀
| 0x08 => (canon backpressure.set (core func)) 🔀
| 0x09 rs:<resultlist> opts:<opts> => (canon task.return rs opts (core func)) 🔀
| 0x0a async?:<async>? m:<core:memdix> => (canon task.wait async? (memory m) (core func)) 🔀
| 0x0b async?:<async>? m:<core:memidx> => (canon task.poll async? (memory m) (core func)) 🔀
| 0x0c async?:<async>? => (canon task.yield async? (core func)) 🔀
| 0x0a 0x7f i:<u32> => (canon context.get i32 i (core func)) 🔀
| 0x0b 0x7f i:<u32> => (canon context.set i32 i (core func)) 🔀
| 0x0c async?:<async>? => (canon yield async? (core func)) 🔀
| 0x0d => (canon subtask.drop (core func)) 🔀
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
| 0x0f t:<typeidx> opts:<opts> => (canon stream.read t opts (core func)) 🔀
Expand All @@ -313,6 +313,11 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x1c opts:<opts> => (canon error-context.new opts (core func)) 🔀
| 0x1d opts:<opts> => (canon error-context.debug-message opts (core func)) 🔀
| 0x1e => (canon error-context.drop (core func)) 🔀
| 0x1f => (canon waitable-set.new (core func)) 🔀
| 0x20 async?:<async>? m:<core:memidx> => (canon waitable-set.wait async? (memory m) (core func)) 🔀
| 0x21 async?:<async>? m:<core:memidx> => (canon waitable-set.poll async? (memory m) (core func)) 🔀
| 0x22 => (canon waitable-set.drop (core func)) 🔀
| 0x23 => (canon waitable.join (core func)) 🔀
async? ::= 0x00 =>
| 0x01 => async
opts ::= opt*:vec(<canonopt>) => opt*
Expand Down
21 changes: 14 additions & 7 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ being specified here.
* [`canon resource.new`](#canon-resourcenew)
* [`canon resource.drop`](#canon-resourcedrop)
* [`canon resource.rep`](#canon-resourcerep)
* [`canon task.backpressure`](#-canon-taskbackpressure) 🔀
* [`canon context.get`](#-canon-contextget) 🔀
* [`canon context.set`](#-canon-contextset) 🔀
* [`canon backpressure.set`](#-canon-backpressureset) 🔀
* [`canon task.return`](#-canon-taskreturn) 🔀
* [`canon task.wait`](#-canon-taskwait) 🔀
* [`canon task.poll`](#-canon-taskpoll) 🔀
* [`canon task.yield`](#-canon-taskyield) 🔀
* [`canon yield`](#-canon-yield) 🔀
* [`canon waitable-set.new`](#-canon-waitable-setnew) 🔀
* [`canon waitable-set.wait`](#-canon-waitable-setwait) 🔀
* [`canon waitable-set.poll`](#-canon-waitable-setpoll) 🔀
* [`canon waitable-set.drop`](#-canon-waitable-setdrop) 🔀
* [`canon waitable.join`](#-canon-waitablejoin) 🔀
* [`canon subtask.drop`](#-canon-subtaskdrop) 🔀
* [`canon {stream,future}.new`](#-canon-streamfuturenew) 🔀
* [`canon {stream,future}.{read,write}`](#-canon-streamfuturereadwrite) 🔀
Expand Down Expand Up @@ -2990,19 +2995,19 @@ Note that the "locally-defined" requirement above ensures that only the
component instance defining a resource can access its representation.


### 🔀 `canon task.backpressure`
### 🔀 `canon backpressure.set`

For a canonical definition:
```wasm
(canon task.backpressure (core func $f))
(canon backpressure.set (core func $f))
```
validation specifies:
* `$f` is given type `[i32] -> []`

Calling `$f` invokes the following function, which sets the `backpressure`
flag on the current `ComponentInstance`:
```python
async def canon_task_backpressure(task, flat_args):
async def canon_backpressure_set(task, flat_args):
trap_if(task.opts.sync)
task.inst.backpressure = bool(flat_args[0])
return []
Expand Down Expand Up @@ -3533,6 +3538,8 @@ async def canon_error_context_drop(task, i):

### 🧵 `canon thread.spawn`

TODO: add new about new ContextLocalStorage

For a canonical definition:
```wasm
(canon thread.spawn (type $ft) (core func $st))
Expand Down
193 changes: 137 additions & 56 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1402,11 +1402,16 @@ canon ::= ...
| (canon resource.new <typeidx> (core func <id>?))
| (canon resource.drop <typeidx> async? (core func <id>?))
| (canon resource.rep <typeidx> (core func <id>?))
| (canon task.backpressure (core func <id>?)) 🔀
| (canon context.get <valtype> <u32> (core func <id>?)) 🔀
| (canon context.set <valtype> <u32> (core func <id>?)) 🔀
| (canon backpressure.set (core func <id>?)) 🔀
| (canon task.return (result <valtype>)? <canonopt>* (core func <id>?)) 🔀
| (canon task.wait async? (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.poll async? (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.yield async? (core func <id>?)) 🔀
| (canon yield async? (core func <id>?)) 🔀
| (canon waitable-set.new (core func <id>?)) 🔀
| (canon waitable-set.wait async? (memory <core:memidx>) (core func <id>?)) 🔀
| (canon waitable-set.poll async? (memory <core:memidx>) (core func <id>?)) 🔀
| (canon waitable-set.drop (core func <id>?)) 🔀
| (canon waitable.join (core func <id>?)) 🔀
| (canon subtask.drop (core func <id>?)) 🔀
| (canon stream.new <typeidx> (core func <id>?)) 🔀
| (canon stream.read <typeidx> <canonopt>* (core func <id>?)) 🔀
Expand Down Expand Up @@ -1530,18 +1535,43 @@ transferring ownership of the newly-created resource to the export's caller.
See the [async explainer](Async.md) for high-level context and terminology and
the [Canonical ABI explainer] for detailed runtime semantics.

###### 🔀 `task.backpressure`
###### 🔀 `context.get`

| Synopsis | |
| -------------------------- | ------------------ |
| Approximate WIT signature | `func<T,i>() -> T` |
| Canonical ABI signature | `[] -> [T]` |

The `context.get` built-in returns the `i`th element of the [current execution
context]'s [context-local storage array]. Validation currently restricts `i` to
be less than 2 and `t` to be `i32`, but will be relaxed in the future. (See
also [`canon_context_get`] in the Canonical ABI explainer for details.)

###### 🔀 `context.set`

| Synopsis | |
| -------------------------- | ----------------- |
| Approximate WIT signature | `func<T,i>(v: T)` |
| Canonical ABI signature | `[T] -> []` |

The `context.set` built-in sets the `i`th element of the [current execution
context]'s [context-local storage array] to the value `v`. Validation currently
restricts `i` to be less than 2 and `t` to be `i32`, but will be relaxed in the
future. (See also [`canon_context_set`] in the Canonical ABI explainer for
details.)

###### 🔀 `backpressure.set`

| Synopsis | |
| -------------------------- | --------------------- |
| Approximate WIT signature | `func(enable: bool)` |
| Canonical ABI signature | `[enable:i32] -> []` |

The `task.backpressure` built-in allows the async-lifted callee to toggle a
The `backpressure.set` built-in allows the async-lifted callee to toggle a
per-component-instance flag that, when set, prevents new incoming export calls
to the component (until the flag is unset). This allows the component to exert
[backpressure]. (See also [`canon_task_backpressure`] in the Canonical ABI
explainer.)
[backpressure]. (See also [`canon_backpressure_set`] in the Canonical ABI
explainer for details.)

###### 🔀 `task.return`

Expand All @@ -1553,24 +1583,52 @@ called, the declared return type and `canonopt`s are checked to exactly match
those of the current task. (See also "[Returning]" in the async explainer and
[`canon_task_return`] in the Canonical ABI explainer.)

###### 🔀 `task.wait`
###### 🔀 `yield`

| Synopsis | |
| -------------------------- | ------------------ |
| Approximate WIT signature | `func<async?>()` |
| Canonical ABI signature | `[] -> []` |

The `yield` built-in allows the runtime to switch to other tasks, enabling a
long-running computation to cooperatively interleave execution. If the `async`
immediate is present, the runtime can switch to other tasks in the *same*
component instance, which the calling core wasm must be prepared to handle. If
`async` is not present, only tasks in *other* component instances may be
switched to. (See also [`canon_yield`] in the Canonical ABI explainer for
details.)

| Synopsis | |
| -------------------------- | ---------------------------------------- |
| Approximate WIT signature | `func<async?>() -> event` |
| Canonical ABI signature | `[payload_addr:i32] -> [event-kind:i32]` |
###### 🔀 `waitable-set.new`

where `event`, `event-kind`, and `payload` are defined in WIT as:
| Synopsis | |
| -------------------------- | ------------------------ |
| Approximate WIT signature | `func() -> waitable-set` |
| Canonical ABI signature | `[] -> [i32]` |

The `waitable-set.new` built-in returns the `i32` index of a new [waitable
set]. The `waitable-set` type is not a true WIT-level type but instead serves
to document associated built-ins below. Waitable sets start out empty and are
populated explicitly with [waitables] by `waitable.join`. (See also
[`canon_waitable_set_new`] in the Canonical ABI explainer for details.)

###### 🔀 `waitable-set.wait`

| Synopsis | |
| -------------------------- | ---------------------------------------------- |
| Approximate WIT signature | `func<async?>(s: waitable-set) -> event` |
| Canonical ABI signature | `[s:i32 payload-addr:i32] -> [event-code:i32]` |

where `event`, `event-code`, and `payload` are defined in WIT as:
```wit
record event {
kind: event-kind,
kind: event-code,
payload: payload,
}
enum event-kind {
enum event-code {
none,
call-starting,
call-started,
call-returned,
yielded,
stream-read,
stream-write,
future-read,
Expand All @@ -1582,58 +1640,73 @@ record payload {
}
```

The `task.wait` built-in waits for one of the pending events to occur, and then
returns an `event` describing it.
The `waitable-set.wait` built-in waits for any one of the [waitables] in the
given [waitable set] `s` to make progress and then returns an `event`
describing the event. The `event-code` `none` is never returned. Waitable sets
may be `wait`ed upon when empty, in which case the caller will necessarily
block until another task adds a waitable to the set that can make progress.

If the `async` immediate is present, other tasks in the same component instance
can be started (via export call) or resumed while the current task blocks. If
`async` is not present, the current component instance will not execute any
code until `wait` returns (however, *other* component instances may execute
code in the interim).

In the Canonical ABI, the return value provides the `event-kind`, and the
`payload` value is stored at the address passed as the `payload_addr`
parameter. (See also "[Waiting]" in the async explainer and [`canon_task_wait`]
in the Canonical ABI explainer.)
In the Canonical ABI, the return value provides the `event-code`, and the
`payload` value is stored at the address passed as the `payload-addr`
parameter. (See also [`canon_waitable_set_wait`] in the Canonical ABI explainer
for details.)

###### 🔀 `task.poll`
###### 🔀 `waitable-set.poll`

| Synopsis | |
| -------------------------- | ----------------------------------- |
| Approximate WIT signature | `func<async?>() -> option<event> ` |
| Canonical ABI signature | `[event_addr:i32] -> [is_some:i32]` |
| Synopsis | |
| -------------------------- | ---------------------------------------------- |
| Approximate WIT signature | `func<async?>(s: waitable-set) -> event` |
| Canonical ABI signature | `[s:i32 payload-addr:i32] -> [event-code:i32]` |

where `event`, `event-kind`, and `payload` are defined as in [`task.wait`](#-taskwait).
where `event`, `event-code`, and `payload` are defined as in
[`waitable-set.wait`](#-waitable-setwait).

The `task.poll` built-in returns either `none` if no event was immediately
available, or `some` containing an event code and payload. `poll` implicitly
performs a `task.yield`, allowing other tasks to be scheduled before `poll`
returns. The `async?` immediate is passed to `task.yield`, determining whether
other tasks in the same component instance may execute.
The `waitable-set.poll` built-in returns the `event-code` `none` if no event
was available without blocking. `poll` implicitly performs a `yield`, allowing
other tasks to be scheduled before `poll` returns. The `async?` immediate is
passed to `yield`, determining whether other code in the same component
instance may execute.

In the Canonical ABI, the return value `is_some` holds a boolean value
indicating whether an event was immediately available, and if so, the `event`
value, containing the code and payloads are stored into the buffer pointed to
by `event_addr`. (See also [`canon_task_poll`] n the Canonical ABI explainer.)
The Canonical ABI of `waitable-set.poll` is the same as `waitable-set.wait`
(with the `none` case indicated by returning `0`). (See also
[`canon_waitable_set_poll`] in the Canonical ABI explainer for details.)

###### 🔀 `task.yield`
###### 🔀 `waitable-set.drop`

| Synopsis | |
| -------------------------- | ------------------ |
| Approximate WIT signature | `func<async?>()` |
| Canonical ABI signature | `[] -> []` |
| Synopsis | |
| -------------------------- | ------------------------ |
| Approximate WIT signature | `func(s: waitable-set)` |
| Canonical ABI signature | `[s:i32] -> []` |

The `task.yield` built-in allows the runtime to switch to another task,
enabling a long-running computation to cooperatively interleave execution with
other tasks.
The `waitable-set.drop` built-in removes the indicated [waitable set] from the
current instance's table of waitable sets, trapping if the waitable set is not
empty or if another task is concurrently `wait`ing on it. (See also
[`canon_waitable_set_drop`] in the Canonical ABI explainer for details.)

If the `async` immediate is present, other tasks in the same component instance
can be started (via export call) or resumed while the current task blocks and
thus the core wasm calling `task.yield` must be reentrant. If `async` is not
present, only tasks in *other* component instances may execute, and thus the
calling core wasm will not observe any reentrance.
###### 🔀 `waitable.join`

| Synopsis | |
| -------------------------- | ---------------------------------------------------- |
| Approximate WIT signature | `func(w: waitable, maybe_set: option<waitable-set>)` |
| Canonical ABI signature | `[w:i32, maybe_set:i32] -> []` |

The `waitable.join` built-in may be called given a [waitable] and an optional
[waitable set]. `join` first removes `w` from any waitable set that it is a
member of and then, if `maybe_set` is not `none`, `w` is added to that set.
Thus, `join` can be used to arbitrarily add, change and remove waitables from
waitable sets in the same component instance, preserving the invariant that a
waitable can be in at most one set.

(See also [`canon_task_yield`] in the Canonical ABI explainer.)
In the Canonical ABI, `w` is an index into the component instance's [waitables]
table and can be any type of waitable (`subtask` or
`{readable,writable}-{stream,future}-end`). (See also [`canon_waitable_join`]
in the Canonical ABI explainer for details.)

###### 🔀 `subtask.drop`

Expand Down Expand Up @@ -2702,11 +2775,16 @@ For some use-case-focused, worked examples, see:

[Adapter Functions]: FutureFeatures.md#custom-abis-via-adapter-functions
[Canonical ABI explainer]: CanonicalABI.md
[`canon_context_get`]: CanonicalABI.md#-canon-contextget
[`canon_context_set`]: CanonicalABI.md#-canon-contextset
[`canon_backpressure_set`]: CanonicalABI.md#-canon-backpressureset
[`canon_task_return`]: CanonicalABI.md#-canon-taskreturn
[`canon_task_wait`]: CanonicalABI.md#-canon-taskwait
[`canon_task_poll`]: CanonicalABI.md#-canon-taskpoll
[`canon_task_yield`]: CanonicalABI.md#-canon-taskyield
[`canon_task_backpressure`]: CanonicalABI.md#-canon-taskbackpressure
[`canon_yield`]: CanonicalABI.md#-canon-yield
[`canon_waitable_set_new`]: CanonicalABI.md#-canon-waitable-setnew
[`canon_waitable_set_wait`]: CanonicalABI.md#-canon-waitable-setwait
[`canon_waitable_set_poll`]: CanonicalABI.md#-canon-waitable-setpoll
[`canon_waitable_set_drop`]: CanonicalABI.md#-canon-waitable-setdrop
[`canon_waitable_join`]: CanonicalABI.md#-canon-waitablejoin
[`canon_stream_new`]: CanonicalABI.md#-canon-streamfuturenew
[`canon_stream_read`]: CanonicalABI.md#-canon-streamfuturereadwrite
[`canon_future_read`]: CanonicalABI.md#-canon-streamfuturereadwrite
Expand All @@ -2728,12 +2806,15 @@ For some use-case-focused, worked examples, see:

[Task]: Async.md#task
[Current Task]: Async.md#current-task
[Current Execution Context]: Async.md#current-execution-context
[Context-Local Storage Array]: Async.md#current-execution-context
[Subtask]: Async.md#subtask
[Stream or Future]: Async.md#streams-and-futures
[Readable or Writable End]: Async.md#streams-and-futures
[Writable End]: Async.md#streams-and-futures
[Waiting]: Async.md#waiting
[Waitables]: Async.md#waiting
[Waitable Set]: Async.md#waiting
[Backpressure]: Async.md#backpressure
[Returning]: Async.md#returning

Expand Down
Loading

0 comments on commit 67f4526

Please sign in to comment.