Skip to content

Commit

Permalink
[WIP] add waitable-set
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 13, 2025
1 parent 0b3002d commit b67cd8d
Show file tree
Hide file tree
Showing 6 changed files with 762 additions and 346 deletions.
245 changes: 179 additions & 66 deletions design/mvp/Async.md

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,11 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x04 rt:<typeidx> => (canon resource.rep rt (core func))
| 0x05 ft:<typeidx> => (canon thread.spawn ft (core func)) 🧵
| 0x06 => (canon thread.available_parallelism (core func)) 🧵
| 0x08 => (canon task.backpressure (core func)) 🔀
| 0x08 => (canon backpressure.set (core func)) 🔀
| 0x09 rs:<resultlist> opts:<opts> => (canon task.return rs opts (core func)) 🔀
| 0x0a async?:<async>? m:<core:memdix> => (canon task.wait async? (memory m) (core func)) 🔀
| 0x0b async?:<async>? m:<core:memidx> => (canon task.poll async? (memory m) (core func)) 🔀
| 0x0c async?:<async>? => (canon task.yield async? (core func)) 🔀
| 0x0a 0x7f i:<u32> => (canon context.get i32 i (core func)) 🔀
| 0x0b 0x7f i:<u32> => (canon context.set i32 i (core func)) 🔀
| 0x0c async?:<async>? => (canon yield async? (core func)) 🔀
| 0x0d => (canon subtask.drop (core func)) 🔀
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
| 0x0f t:<typeidx> opts:<opts> => (canon stream.read t opts (core func)) 🔀
Expand All @@ -313,6 +313,11 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x1c opts:<opts> => (canon error-context.new opts (core func)) 🔀
| 0x1d opts:<opts> => (canon error-context.debug-message opts (core func)) 🔀
| 0x1e => (canon error-context.drop (core func)) 🔀
| 0x1f => (canon waitable-set.new (core func)) 🔀
| 0x20 async?:<async>? m:<core:memidx> => (canon waitable-set.wait async? (memory m) (core func)) 🔀
| 0x21 async?:<async>? m:<core:memidx> => (canon waitable-set.poll async? (memory m) (core func)) 🔀
| 0x22 => (canon waitable-set.drop (core func)) 🔀
| 0x23 => (canon waitable.join (core func)) 🔀
async? ::= 0x00 =>
| 0x01 => async
opts ::= opt*:vec(<canonopt>) => opt*
Expand Down
146 changes: 97 additions & 49 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ being specified here.
* [`canon resource.new`](#canon-resourcenew)
* [`canon resource.drop`](#canon-resourcedrop)
* [`canon resource.rep`](#canon-resourcerep)
* [`canon task.backpressure`](#-canon-taskbackpressure) 🔀
* [`canon context.get`](#-canon-contextget) 🔀
* [`canon context.set`](#-canon-contextset) 🔀
* [`canon backpressure.set`](#-canon-backpressureset) 🔀
* [`canon task.return`](#-canon-taskreturn) 🔀
* [`canon task.wait`](#-canon-taskwait) 🔀
* [`canon task.poll`](#-canon-taskpoll) 🔀
* [`canon task.yield`](#-canon-taskyield) 🔀
* [`canon yield`](#-canon-yield) 🔀
* [`canon waitable-set.new`](#-canon-waitable-setnew) 🔀
* [`canon waitable-set.wait`](#-canon-waitable-setwait) 🔀
* [`canon waitable-set.poll`](#-canon-waitable-setpoll) 🔀
* [`canon waitable-set.drop`](#-canon-waitable-setdrop) 🔀
* [`canon waitable.join`](#-canon-waitablejoin) 🔀
* [`canon subtask.drop`](#-canon-subtaskdrop) 🔀
* [`canon {stream,future}.new`](#-canon-streamfuturenew) 🔀
* [`canon {stream,future}.{read,write}`](#-canon-streamfuturereadwrite) 🔀
Expand Down Expand Up @@ -159,6 +164,7 @@ behavior and enforce invariants.
class ComponentInstance:
resources: Table[ResourceHandle]
waitables: Table[Waitable]
waitable_sets: Table[WaitableSet]
error_contexts: Table[ErrorContext]
num_tasks: int
may_leave: bool
Expand All @@ -170,6 +176,7 @@ class ComponentInstance:
def __init__(self):
self.resources = Table[ResourceHandle]()
self.waitables = Table[Waitable]()
self.waitable_sets = Table[WaitableSet]()
self.error_contexts = Table[ErrorContext]()
self.num_tasks = 0
self.may_leave = True
Expand All @@ -194,7 +201,7 @@ class Table(Generic[ElemT]):
array: list[Optional[ElemT]]
free: list[int]

MAX_LENGTH = 2**30 - 1
MAX_LENGTH = 2**28 - 1

def __init__(self):
self.array = [None]
Expand Down Expand Up @@ -390,6 +397,26 @@ that do all the heavy lifting are shared with function parameter/result lifting
and lowering and defined below.


#### Context-Local Storage

TODO
```python
class ContextLocalStorage:
LENGTH = 2
array: list[int]

def __init__(self):
self.array = [0] * ContextLocalStorage.LENGTH

def set(self, i, v):
assert(types_match_values(['i32'], [v]))
self.array[i] = v

def get(self, i):
return self.array[i]
```


#### Task State

A `Task` object is created for each call to `canon_lift` and is implicitly
Expand All @@ -404,9 +431,9 @@ class Task:
caller: Optional[Task]
on_return: Optional[Callable]
on_block: Callable[[Awaitable], Awaitable]
waitable_set: WaitableSet
num_subtasks: int
num_borrows: int
context: ContextLocalStorage

def __init__(self, opts, inst, ft, caller, on_return, on_block):
self.opts = opts
Expand All @@ -418,6 +445,7 @@ class Task:
self.waitable_set = WaitableSet()
self.num_subtasks = 0
self.num_borrows = 0
self.context = ContextLocalStorage()
```
Using a conservative syntactic analysis of a complete component, an optimizing
implementation can statically eliminate fields when a particular feature (such
Expand Down Expand Up @@ -586,17 +614,6 @@ in `enter`, preventing more pending tasks from being started in the interim.
pending_future.set_result(None)
```

The `Task.wait` and `Task.poll` methods delegate to `WaitableSet`, defined in
the next section.
```python
async def wait(self, sync) -> EventTuple:
return await self.wait_on(sync, self.waitable_set.wait())

async def poll(self, sync) -> Optional[EventTuple]:
await self.yield_(sync)
return self.waitable_set.poll()
```

The `Task.yield_` method is called by `canon task.yield` or, when `callback` is
used, when core wasm returns the "yield" code to the event loop. Yielding
allows the runtime to switch execution to another task without having to wait
Expand Down Expand Up @@ -717,7 +734,6 @@ decrement of `num_tasks`.
def exit(self):
assert(Task.current.locked())
trap_if(self.num_subtasks > 0)
self.waitable_set.drop()
trap_if(self.on_return)
assert(self.num_borrows == 0)
trap_if(self.inst.num_tasks == 1 and self.inst.backpressure)
Expand Down Expand Up @@ -745,19 +761,19 @@ code that produces the events (specifically, in `subtask_event` and
`copy_event`).
```python
class CallState(IntEnum):
STARTING = 0
STARTED = 1
RETURNED = 2
STARTING = 1
STARTED = 2
RETURNED = 3

class EventCode(IntEnum):
NONE = 0
CALL_STARTING = CallState.STARTING
CALL_STARTED = CallState.STARTED
CALL_RETURNED = CallState.RETURNED
YIELDED = 3
STREAM_READ = 4
STREAM_WRITE = 5
FUTURE_READ = 6
FUTURE_WRITE = 7
STREAM_READ = 5
STREAM_WRITE = 6
FUTURE_READ = 7
FUTURE_WRITE = 8

EventTuple = tuple[EventCode, int, int]
```
Expand Down Expand Up @@ -850,15 +866,19 @@ polling.
class WaitableSet:
elems: list[Waitable]
maybe_has_pending_event: asyncio.Event
num_waiting: int

def __init__(self):
self.elems = []
self.maybe_has_pending_event = asyncio.Event()
self.num_waiting = 0

async def wait(self) -> EventTuple:
self.num_waiting += 1
while True:
await self.maybe_has_pending_event.wait()
if (e := self.poll()):
self.num_waiting -= 1
return e

def poll(self) -> Optional[EventTuple]:
Expand All @@ -873,11 +893,11 @@ class WaitableSet:

def drop(self):
trap_if(len(self.elems) > 0)
trap_if(self.num_waiting > 0)
```
The `WaitableSet.drop` method traps if dropped (by the owning `Task`) while it
still contains elements (whose `Waitable.waitable_set` field would become
dangling). This can happen if a task tries to exit in the middle of a stream or
future copy operation.
The `WaitableSet.drop` method traps if dropped while it still contains elements
(whose `Waitable.waitable_set` field would become dangling) or if it is being
waited-upon by another `Task`.

Note: the `random.shuffle` in `poll` is meant to give runtimes the semantic
freedom to schedule delivery of events non-deterministically (e.g., taking into
Expand Down Expand Up @@ -929,7 +949,6 @@ turn only happens if the call is `async` *and* blocks. In this case, the
self.supertask = task
self.supertask.num_subtasks += 1
Waitable.__init__(self)
Waitable.join(self, task.waitable_set)
return task.inst.waitables.add(self)
```
The `num_subtasks` increment ensures that the parent `Task` cannot `exit`
Expand Down Expand Up @@ -2700,23 +2719,34 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo
[] = await call_and_trap_on_throw(callee, task, flat_args)
assert(types_match_values(flat_ft.results, []))
else:
[packed_ctx] = await call_and_trap_on_throw(callee, task, flat_args)
assert(types_match_values(flat_ft.results, [packed_ctx]))
while packed_ctx != 0:
is_yield = bool(packed_ctx & 1)
ctx = packed_ctx & ~1
if is_yield:
await task.yield_(sync = False)
event, p1, p2 = (EventCode.YIELDED, 0, 0)
[packed] = await call_and_trap_on_throw(callee, task, flat_args)
while True:
code,si = unpack_callback_result(packed)
match code:
case CallbackCode.EXIT:
break
case CallbackCode.WAIT:
s = task.inst.waitable_sets.get(si)
e = await task.wait_on(opts.sync, s.wait())
case CallbackCode.POLL:
s = task.inst.waitable_sets.get(si)
await task.yield_(opts.sync)
e = s.poll()
case CallbackCode.YIELD:
await task.yield_(opts.sync)
e = None
if e:
event, p1, p2 = e
else:
event, p1, p2 = await task.wait(sync = False)
[packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2])
event, p1, p2 = (EventCode.NONE, 0, 0)
[packed] = await call_and_trap_on_throw(opts.callback, task, [event, p1, p2])
task.exit()
```
In the `sync` case, if the `always-task-return` ABI option is *not* set, then
`task.return_` will be called by `callee` to return values; otherwise,
`task.return_` must be called by `canon_lift`.

TODO
In the `async` case, there are two sub-cases depending on whether the
`callback` `canonopt` was set. When `callback` is present, waiting happens in
an "event loop" inside `canon_lift` which also allows yielding (i.e., allowing
Expand All @@ -2727,6 +2757,24 @@ coroutine) to switch to another task. Thus, `callback` is an optimization for
avoiding fiber creation for async languages that don't need it (e.g., JS,
Python, C# and Rust).

TODO
```python
class CallbackCode(IntEnum):
EXIT = 0
WAIT = 1
POLL = 2
YIELD = 3
MAX = 3

def unpack_callback_result(packed):
code = packed & 0xf
trap_if(code > CallbackCode.MAX)
assert(packed < 2**32)
assert(Table.MAX_LENGTH < 2**28)
waitable_set_index = packed >> 4
return (CallbackCode(code), waitable_set_index)
```

Uncaught Core WebAssembly [exceptions] result in a trap at component
boundaries. Thus, if a component wishes to signal an error, it must use some
sort of explicit type such as `result` (whose `error` case particular language
Expand Down Expand Up @@ -2838,9 +2886,9 @@ immediately return control flow back to the `async` caller if `callee` blocks:
subtask.finish()
return (EventCode(subtask.state), subtaski, 0)
subtask.set_event(subtask_event)
assert(0 < subtaski <= Table.MAX_LENGTH < 2**30)
assert(0 <= int(subtask.state) < 2**2)
flat_results = [subtaski | (int(subtask.state) << 30)]
assert(0 < subtaski <= Table.MAX_LENGTH < 2**28)
assert(0 <= int(subtask.state) < 2**4)
flat_results = [int(subtask.state) | (subtaski << 4)]

return flat_results
```
Expand Down Expand Up @@ -2990,19 +3038,19 @@ Note that the "locally-defined" requirement above ensures that only the
component instance defining a resource can access its representation.


### 🔀 `canon task.backpressure`
### 🔀 `canon backpressure.set`

For a canonical definition:
```wasm
(canon task.backpressure (core func $f))
(canon backpressure.set (core func $f))
```
validation specifies:
* `$f` is given type `[i32] -> []`

Calling `$f` invokes the following function, which sets the `backpressure`
flag on the current `ComponentInstance`:
```python
async def canon_task_backpressure(task, flat_args):
async def canon_backpressure_set(task, flat_args):
trap_if(task.opts.sync)
task.inst.backpressure = bool(flat_args[0])
return []
Expand Down Expand Up @@ -3284,15 +3332,13 @@ context-switching overhead.
def copy_event(revoke_buffer):
revoke_buffer()
e.copying = False
e.join(None)
return (event_code, i, pack_copy_result(task, buffer, e))
def on_partial_copy(revoke_buffer):
e.set_event(partial(copy_event, revoke_buffer))
def on_copy_done():
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
if e.copy(buffer, on_partial_copy, on_copy_done) != 'done':
e.copying = True
e.join(task.waitable_set)
return [BLOCKED]
return [pack_copy_result(task, buffer, e)]
```
Expand Down Expand Up @@ -3533,6 +3579,8 @@ async def canon_error_context_drop(task, i):

### 🧵 `canon thread.spawn`

TODO: add new about new ContextLocalStorage

For a canonical definition:
```wasm
(canon thread.spawn (type $ft) (core func $st))
Expand Down
Loading

0 comments on commit b67cd8d

Please sign in to comment.