Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'stream' and 'future' types #405

Merged
merged 22 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7d1034a
Add 'stream' and 'future' types
lukewagner Oct 11, 2024
e8198f7
Grammar fix
lukewagner Oct 21, 2024
e6469e8
Spelling fix
lukewagner Oct 21, 2024
15298d5
Clarify wording in Async.md concerning the writable end
lukewagner Oct 21, 2024
fa3cd3a
Mention the callback option alongside task.wait
lukewagner Oct 21, 2024
a864991
Add <typeidx> to {stream,future}.{read,write}
lukewagner Oct 22, 2024
0b5247f
Handle the concurrently-closed case in {stream,future}.cancel-{read,w…
lukewagner Oct 24, 2024
b20409a
Add note on spec-internal state vs. implementation
lukewagner Oct 24, 2024
a15ec3f
Update channel/pipe wording
lukewagner Oct 24, 2024
4e456d7
Improve wording
lukewagner Oct 24, 2024
08b1387
Put the canonopts on {stream,future}.{read,write} instead of copying …
lukewagner Oct 25, 2024
87f7b85
Allow sync task.{wait,yield,poll} and {stream,future}.{read,write}
lukewagner Oct 25, 2024
300c86c
Only enforce scoping for streams/futures containing borrows
lukewagner Oct 28, 2024
70d727a
Break waitable.drop into subtask.drop and {stream,future}.close-{read…
lukewagner Oct 30, 2024
3cf3d5f
Remove dangling syntax rule for waitable.drop
lukewagner Oct 31, 2024
4581ba5
Update subsection links and other dangling waitable.drop reference
lukewagner Oct 31, 2024
e074f41
Add 'error' type and 'canon error.{new,debug-message,drop}' built-ins
lukewagner Oct 29, 2024
30061e5
Update {stream,future}.close-writable descriptions
lukewagner Nov 1, 2024
4306ee2
Add example to explainer text about 'error'
lukewagner Nov 1, 2024
fcea885
Remove restriction on write-before-lift, remove invalid assert, add test
lukewagner Nov 3, 2024
f9b341b
Add <typeidx> to {stream,future}.cancel-{read,write}
lukewagner Nov 4, 2024
e8d192e
Rename 'error' to 'error-context'
lukewagner Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 129 additions & 27 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ summary of the motivation and animated sketch of the design in action.
* [Current task](#current-task)
* [Subtask and Supertask](#subtask-and-supertask)
* [Structured concurrency](#structured-concurrency)
* [Streams and Futures](#streams-and-futures)
* [Waiting](#waiting)
* [Backpressure](#backpressure)
* [Returning](#returning)
Expand Down Expand Up @@ -106,8 +107,30 @@ Thus, backpressure combined with the partitioning of low-level state provided
by the Component Model enables sync and async code to interoperate while
preserving the expectations of both.

[TODO](#todo): `future` and `stream` types that can be used in function
signatures will be added next.
In addition to being able to define and call whole functions asynchronously,
the `stream` and `future` types can be used in function signatures to pass
parameters and results incrementally over time, achieving finer-grained
concurrency. Streams and futures are thus not defined to be free-standing
resources with their own internal memory buffers (like a traditional channel or
pipe) but, rather, more-primitive control-flow mechanisms that synchronize the
incremental passing of parameters and results during cross-component calls.
Higher-level resources like channels and pipes could then be defined in terms
of these lower-level `stream` and `future` primitives, e.g.:
```wit
resource pipe {
constructor(buffer-size: u32);
write: func(bytes: stream<u8>) -> result;
read: func() -> stream<u8>;
}
```
but also many other domain-specific concurrent resources like WASI HTTP request
and response bodies or WASI blobs. Streams and futures are however high-level
enough to be bound automatically to many source languages' built-in concurrency
features like futures, promises, streams, generators and iterators, unlike
lower-level concurrency primitives (like callbacks or `wasi:io@0.2.0`
`pollable`s). Thus, the Component Model seeks to provide the lowest-level
fine-grained concurrency primitives that are high-level and idiomatic enough to
enable automatic generation of usable language-integrated bindings.


## Concepts
Expand Down Expand Up @@ -180,18 +203,80 @@ invocation of an export by the host. Moreover, at any one point in time, the
set of tasks active in a linked component graph form a forest of async call
trees which e.g., can be visualized using a traditional flamegraph.

The Canonical ABI's Python code enforces Structured Concurrency by maintaining
a simple per-[`Task`] `num_async_subtasks` counter that traps if not zero when
the `Task` finishes.
The Canonical ABI's Python code enforces Structured Concurrency by incrementing
a per-[`Task`] counter when a `Subtask` is created, decrementing when a
dicej marked this conversation as resolved.
Show resolved Hide resolved
`Subtask` is destroyed, and trapping if the counter is not zero when the `Task`
attempts to exit.

### Streams and Futures

Streams and Futures have two "ends": a *readable end* and *writable end*. When
*consuming* a `stream` or `future` value as a parameter (of an export call with
a `stream` or `future` somewhere in the parameter types) or result (of an
import call with a `stream` or `future` somewhere in the result type), the
sunfishcode marked this conversation as resolved.
Show resolved Hide resolved
receiver always gets *unique ownership* of the *readable end* of the `stream`
or `future`. When *producing* a `stream` or `future` value as a parameter (of
an import call) or result (of an export call), the producer can either
*transfer ownership* of a readable end it has already received or it can create
a fresh writable end (via `stream.new` or `future.new`) and then lift this
writable end to create a fresh readable end in the consumer while maintaining
ownership of the writable end in the producer. To maintain the invariant that
readable ends are unique, a writable end can be lifted at most once, trapping
otherwise.

Based on this, `stream<T>` and `future<T>` values can be passed between
functions as if they were synchronous `list<T>` and `T` values, resp. For
example, given `f` and `g` with types:
```wit
f: func(x: whatever) -> stream<T>;
g: func(s: stream<T>) -> stuff;
```
`g(f(x))` works as you might hope, concurrently streaming `x` into `f` which
concurrently streams its results into `g`. If `f` has an error, it can close
its returned `stream<T>` with an [`error-context`](Explainer.md#error-context-type)
value which `g` will receive along with the notification that its readable
stream was closed.

If a component instance *would* receive the readable end of a stream for which
it already owns the writable end, the readable end disappears and the existing
writable end is received instead (since the guest can now handle the whole
stream more efficiently wholly from within guest code). E.g., if the same
component instance defined `f` and `g` above, the composition `g(f(x))` would
just instruct the guest to stream directly from `f` into `g` without crossing a
component boundary or performing any extra copies. Thus, strengthening the
previously-mentioned invariant, the readable and writable ends of a stream are
unique *and never in the same component*.

Given the readable or writable end of a stream, core wasm code can call the
imported `stream.read` or `stream.write` canonical built-ins, resp., passing the
pointer and length of a linear-memory buffer to write-into or read-from, resp.
These built-ins can either return immediately if >0 elements were able to be
written or read immediately (without blocking) or return a sentinel "blocked"
value indicating that the read or write will execute concurrently. The
readable and writable ends of streams and futures each have a well-defined
parent `Task` that will receive "progress" events on all child streams/futures
that have previously blocked.

From a [structured-concurrency](#structured-concurrency) perspective, the
readable and writable ends of streams and futures are leaves of the async call
tree. Unlike subtasks, the parent of the readable ends of streams and future
*can* change over time (when transferred via function call, as mentioned
above). However, there is always *some* parent `Task` and this parent `Task`
is prevented from orphaning its children using the same reference-counting
guard mentioned above for subtasks.

### Waiting

When a component asynchronously lowers an import, it is explicitly requesting
that, if the import blocks, control flow be returned back to the calling task
so that it can do something else. Eventually though a task may run out of other
so that it can do something else. Similarly, if `stream.read` or `stream.write`
would block, they return a "blocked" code so that the caller can continue to
make progress on other things. But eventually, a task will run out of other
things to do and will need to **wait** for progress on one of the task's
subtasks. While a task is waiting, the runtime can switch to other running
tasks or start new tasks by invoking exports.
subtasks, readable stream ends, writable stream ends, readable future ends or
writable future ends, which are collectively called its **waitables**. While a
task is waiting on its waitables, the Component Model runtime can switch to
other running tasks or start new tasks by invoking exports.
dicej marked this conversation as resolved.
Show resolved Hide resolved

The Canonical ABI provides two ways for a task to wait:
* The task can call the [`task.wait`] built-in to synchronously wait for
Expand Down Expand Up @@ -234,13 +319,23 @@ the "started" state.

### Returning

The way an async Core WebAssembly function returns its value is by calling
[`task.return`], passing the core values that are to be lifted.

The main reason to have `task.return` is so that a task can continue execution
after returning its value. This is useful for various finalization tasks (such
as logging, billing or metrics) that don't need to be on the critical path of
returning a value to the caller.
The way an async function returns its value is by calling [`task.return`],
passing the core values that are to be lifted as *parameters*. Additionally,
when the `always-task-return` `canonopt` is set, synchronous functions also
return their values by calling `task.return` (as a more expressive and
general alternative to `post-return`).

Returning values by calling `task.return` allows a task to continue executing
even after it has passed its initial results to the caller. This can be useful
for various finalization tasks (freeing memory or performing logging, billing
or metrics operations) that don't need to be on the critical path of returning
a value to the caller, but the major use of executing code after `task.return`
is to continue to read and write from streams and futures. For example, a
stream transformer function of type `func(in: stream<T>) -> stream<U>` will
immediately `task.return` a stream created via `stream.new` and then sit in a
loop interleaving `stream.read`s (of the readable end passed for `in`) and
`stream.write`s (of the writable end it `stream.new`ed) before exiting the
task.

A task may not call `task.return` unless it is in the "started" state. Once
`task.return` is called, the task is in the "returned" state. A task can only
Expand Down Expand Up @@ -419,21 +514,26 @@ For now, this remains a [TODO](#todo) and validation will reject `async`-lifted

## TODO

Native async support is being proposed in progressive chunks. The following
features will be added in future chunks to complete "async" in Preview 3:
* `future`/`stream`/`error`: add for use in function types for finer-grained
concurrency
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
no longer wanted and to please wrap it up promptly
* allow "tail-calling" a subtask so that the current wasm instance can be torn
down eagerly
* `task.index`+`task.wake`: allow tasks in the same instance to wait on and
wake each other (async condvar-style)
Native async support is being proposed incrementally. The following features
will be added in future chunks roughly in the order list to complete the full
"async" story, with a TBD cutoff between what's in [WASI Preview 3] and what
comes after:
* `nonblocking` function type attribute: allow a function to declare in its
type that it will not transitively do anything blocking
* define what `async` means for `start` functions (top-level await + background
tasks), along with cross-task coordination built-ins
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
no longer wanted and to please wrap it up promptly
* zero-copy forwarding/splicing and built-in way to "tail-call" a subtask so
that the current wasm instance can be torn down eagerly while preserving
structured concurrency
* some way to say "no more elements are coming for a while"
* `recursive` function type attribute: allow a function to be reentered
recursively (instead of trapping)
* enable `async` `start` functions
recursively (instead of trapping) and link inner and outer activations
* add `stringstream` specialization of `stream<char>` (just like `string` is
a specialization of `list<char>`)
* allow pipelining multiple `stream.read`/`write` calls
* allow chaining multiple async calls together ("promise pipelining")
* integrate with `shared`: define how to lift and lower functions `async` *and*
`shared`

Expand Down Expand Up @@ -475,3 +575,5 @@ features will be added in future chunks to complete "async" in Preview 3:
[stack-switching]: https://github.com/WebAssembly/stack-switching/
[JSPI]: https://github.com/WebAssembly/js-promise-integration/
[shared-everything-threads]: https://github.com/webAssembly/shared-everything-threads

[WASI Preview 3]: https://github.com/WebAssembly/WASI/tree/main/wasip2#looking-forward-to-preview-3
30 changes: 27 additions & 3 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ primvaltype ::= 0x7f => bool
| 0x75 => f64
| 0x74 => char
| 0x73 => string
| 0x64 => error-context
defvaltype ::= pvt:<primvaltype> => pvt
| 0x72 lt*:vec(<labelvaltype>) => (record (field lt)*) (if |lt*| > 0)
| 0x71 case*:vec(<case>) => (variant case+) (if |case*| > 0)
Expand All @@ -202,6 +203,8 @@ defvaltype ::= pvt:<primvaltype> => pvt
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?)
| 0x69 i:<typeidx> => (own i)
| 0x68 i:<typeidx> => (borrow i)
| 0x66 i:<typeidx> => (stream i)
| 0x65 i:<typeidx> => (future i)
labelvaltype ::= l:<label'> t:<valtype> => l t
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?)
label' ::= len:<u32> l:<label> => l (if len = |l|)
Expand Down Expand Up @@ -287,10 +290,29 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x06 => (canon thread.hw_concurrency (core func)) 🧵
| 0x08 => (canon task.backpressure (core func)) 🔀
| 0x09 ft:<core:typeidx> => (canon task.return ft (core func)) 🔀
| 0x0a m:<core:memdix> => (canon task.wait (memory m) (core func)) 🔀
| 0x0b m:<core:memidx> => (canon task.poll (memory m) (core func)) 🔀
| 0x0c => (canon task.yield (core func)) 🔀
| 0x0a async?:<async>? m:<core:memdix> => (canon task.wait async? (memory m) (core func)) 🔀
| 0x0b async?:<async>? m:<core:memidx> => (canon task.poll async? (memory m) (core func)) 🔀
| 0x0c async?:<async>? => (canon task.yield async? (core func)) 🔀
| 0x0d => (canon subtask.drop (core func)) 🔀
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
| 0x0f t:<typeidx> opts:<opts> => (canon stream.read t opts (core func)) 🔀
| 0x10 t:<typeidx> opts:<opts> => (canon stream.write t opts (core func)) 🔀
| 0x11 t:<typeidx> async?:<async?> => (canon stream.cancel-read async? (core func)) 🔀
| 0x12 t:<typeidx> async?:<async?> => (canon stream.cancel-write async? (core func)) 🔀
| 0x13 t:<typeidx> => (canon stream.close-readable t (core func)) 🔀
| 0x14 t:<typeidx> => (canon stream.close-writable t (core func)) 🔀
| 0x15 t:<typeidx> => (canon future.new t (core func)) 🔀
| 0x16 t:<typeidx> opts:<opts> => (canon future.read t opts (core func)) 🔀
| 0x17 t:<typeidx> opts:<opts> => (canon future.write t opts (core func)) 🔀
| 0x18 t:<typeidx> async?:<async?> => (canon future.cancel-read async? (core func)) 🔀
| 0x19 t:<typeidx> async?:<async?> => (canon future.cancel-write async? (core func)) 🔀
| 0x1a t:<typeidx> => (canon future.close-readable t (core func)) 🔀
| 0x1b t:<typeidx> => (canon future.close-writable t (core func)) 🔀
| 0x1c opts:<opts> => (canon error-context.new opts (core func)) 🔀
| 0x1d opts:<opts> => (canon error-context.debug-message opts (core func)) 🔀
| 0x1e => (canon error-context.drop (core func)) 🔀
async? ::= 0x00 =>
| 0x01 => async
opts ::= opt*:vec(<canonopt>) => opt*
canonopt ::= 0x00 => string-encoding=utf8
| 0x01 => string-encoding=utf16
Expand All @@ -300,6 +322,7 @@ canonopt ::= 0x00 => string-encod
| 0x05 f:<core:funcidx> => (post-return f)
| 0x06 => async 🔀
| 0x07 f:<core:funcidx> => (callback f) 🔀
| 0x08 => always-task-return 🔀
```
Notes:
* The second `0x00` byte in `canon` stands for the `func` sort and thus the
Expand Down Expand Up @@ -459,6 +482,7 @@ named once.

## Binary Format Warts to Fix in a 1.0 Release

* The opcodes (for types, canon built-ins, etc) should be re-sorted
* The two `list` type codes should be merged into one with an optional immediate.
* The `0x00` prefix byte of `importname'` and `exportname'` will be removed or repurposed.

Expand Down
Loading