Skip to content

Commit

Permalink
Allow stream.{read,write}s of length 0 to query/signal readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 25, 2025
1 parent d43430d commit 6eb1b86
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
6 changes: 5 additions & 1 deletion design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,11 @@ These built-ins can either return immediately if >0 elements were able to be
written or read immediately (without blocking) or return a sentinel "blocked"
value indicating that the read or write will execute concurrently. The readable
and writable ends of streams and futures can then be [waited](#waiting) on to
make progress.
make progress. Notification of progress signals *completion* of a read or write
(i.e., the bytes have already been copied into the buffer). Additionally,
*readiness* (to perform a read or write in the future) can be queried and
signalled by performing a `0`-length read or write (see the [Stream State]
section in the Canonical ABI explainer for details).

The `T` element type of streams and futures is optional, such that `future` and
`stream` can be written in WIT without a trailing `<T>`. In this case, the
Expand Down
38 changes: 29 additions & 9 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ class BufferGuestImpl(Buffer):
length: int

def __init__(self, t, cx, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
if t:
trap_if(length > Buffer.MAX_LENGTH)
if t and length > 0:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
self.cx = cx
Expand Down Expand Up @@ -1178,14 +1178,34 @@ but in the opposite direction. Both are implemented by a single underlying
return 'blocked'
else:
ncopy = min(src.remain(), dst.remain())
assert(ncopy > 0)
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
if ncopy > 0:
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
return 'done'
else:
self.reset_and_notify_pending()
return 'done'
```
if self.pending_buffer.remain() == 0:
self.reset_and_notify_pending()
if buffer.remain() == 0:
return 'done'
else:
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done
return 'blocked'
```
The meaning of a `read` or `write` when the length is `0` is that the caller
wants to know when the other side is "ready". When a non-`0`-length `read` or
`write` rendezvous with a `0`-length `write` or `read`, only the `0`-length
operation completes, keeping the non-`0`-length pending and ready for a future
rendezvous. In a rendezvous where *both* the `read` and `write` are `0`-length,
both operations complete. Thus, "readiness" does not guarantee "the next
operation will not block" and performing a `0`-length `read` or `write` doesn't
just *query* for readiness but also *signals* readiness to the other side.
Consequently, components should always follow a successful `0`-length `read` or
`write` with a non-`0`-length `read` or `write`.

Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
are actually stored in the `waitables` table. The classes are almost entirely
Expand Down
26 changes: 18 additions & 8 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer):
length: int

def __init__(self, t, cx, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
if t:
trap_if(length > Buffer.MAX_LENGTH)
if t and length > 0:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
self.cx = cx
Expand Down Expand Up @@ -702,13 +702,23 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
return 'blocked'
else:
ncopy = min(src.remain(), dst.remain())
assert(ncopy > 0)
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
if ncopy > 0:
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
return 'done'
else:
self.reset_and_notify_pending()
return 'done'
if self.pending_buffer.remain() == 0:
self.reset_and_notify_pending()
if buffer.remain() == 0:
return 'done'
else:
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done
return 'blocked'

class StreamEnd(Waitable):
stream: ReadableStream
Expand Down
28 changes: 26 additions & 2 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,19 @@ async def core_func1(task, args):
assert(mem1[retp+0] == wsi)
assert(mem1[retp+4] == 4)

[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
assert(ret == definitions.BLOCKED)

fut4.set_result(None)

[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
assert(event == EventCode.STREAM_WRITE)
assert(mem1[retp+0] == wsi)
assert(mem1[retp+4] == 0)

[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
assert(ret == 0)

[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
[] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi)
[] = await canon_waitable_set_drop(task, seti)
Expand Down Expand Up @@ -1498,6 +1509,9 @@ async def core_func2(task, args):
fut2.set_result(None)
await task.on_block(fut3)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
assert(ret == 0)

mem2[0:8] = bytes(8)
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
assert(ret == 2)
Expand All @@ -1508,9 +1522,19 @@ async def core_func2(task, args):

await task.on_block(fut4)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
assert(ret == 0)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
assert(ret == definitions.BLOCKED)

[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
assert(event == EventCode.STREAM_READ)
assert(mem2[retp+0] == rsi)
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
errctxi = 1
assert(ret == (definitions.CLOSED | errctxi))
assert(p2 == (definitions.CLOSED | errctxi))

[] = await canon_stream_close_readable(U8Type(), task, rsi)
[] = await canon_waitable_set_drop(task, seti)
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
Expand Down

0 comments on commit 6eb1b86

Please sign in to comment.