diff --git a/design/mvp/Async.md b/design/mvp/Async.md index 9cf7a16f..74fcebfa 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -325,7 +325,11 @@ These built-ins can either return immediately if >0 elements were able to be written or read immediately (without blocking) or return a sentinel "blocked" value indicating that the read or write will execute concurrently. The readable and writable ends of streams and futures can then be [waited](#waiting) on to -make progress. +make progress. Notification of progress signals *completion* of a read or write +(i.e., the bytes have already been copied into the buffer). Additionally, +*readiness* (to perform a read or write in the future) can be queried and +signalled by performing a `0`-length read or write (see the [Stream State] +section in the Canonical ABI explainer for details). The `T` element type of streams and futures is optional, such that `future` and `stream` can be written in WIT without a trailing ``. In this case, the diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index af38fdef..6f9adabc 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -363,8 +363,8 @@ class BufferGuestImpl(Buffer): length: int def __init__(self, t, cx, ptr, length): - trap_if(length == 0 or length > Buffer.MAX_LENGTH) - if t: + trap_if(length > Buffer.MAX_LENGTH) + if t and length > 0: trap_if(ptr != align_to(ptr, alignment(t))) trap_if(ptr + length * elem_size(t) > len(cx.opts.memory)) self.cx = cx @@ -1178,14 +1178,34 @@ but in the opposite direction. Both are implemented by a single underlying return 'blocked' else: ncopy = min(src.remain(), dst.remain()) - assert(ncopy > 0) - dst.write(src.read(ncopy)) - if self.pending_buffer.remain() > 0: - self.pending_on_partial_copy(self.reset_pending) + if ncopy > 0: + dst.write(src.read(ncopy)) + if self.pending_buffer.remain() > 0: + self.pending_on_partial_copy(self.reset_pending) + else: + self.reset_and_notify_pending() + return 'done' else: - self.reset_and_notify_pending() - return 'done' -``` + if self.pending_buffer.remain() == 0: + self.reset_and_notify_pending() + if buffer.remain() == 0: + return 'done' + else: + self.pending_buffer = buffer + self.pending_on_partial_copy = on_partial_copy + self.pending_on_copy_done = on_copy_done + return 'blocked' +``` +The meaning of a `read` or `write` when the length is `0` is that the caller +wants to know when the other side is "ready". When a non-`0`-length `read` or +`write` rendezvous with a `0`-length `write` or `read`, only the `0`-length +operation completes, keeping the non-`0`-length pending and ready for a future +rendezvous. In a rendezvous where *both* the `read` and `write` are `0`-length, +both operations complete. Thus, "readiness" does not guarantee "the next +operation will not block" and performing a `0`-length `read` or `write` doesn't +just *query* for readiness but also *signals* readiness to the other side. +Consequently, components should always follow a successful `0`-length `read` or +`write` with a non-`0`-length `read` or `write`. Given the above, we can define the `{Readable,Writable}StreamEnd` classes that are actually stored in the `waitables` table. The classes are almost entirely diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index f5d3e60d..070932cc 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer): length: int def __init__(self, t, cx, ptr, length): - trap_if(length == 0 or length > Buffer.MAX_LENGTH) - if t: + trap_if(length > Buffer.MAX_LENGTH) + if t and length > 0: trap_if(ptr != align_to(ptr, alignment(t))) trap_if(ptr + length * elem_size(t) > len(cx.opts.memory)) self.cx = cx @@ -702,13 +702,23 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst): return 'blocked' else: ncopy = min(src.remain(), dst.remain()) - assert(ncopy > 0) - dst.write(src.read(ncopy)) - if self.pending_buffer.remain() > 0: - self.pending_on_partial_copy(self.reset_pending) + if ncopy > 0: + dst.write(src.read(ncopy)) + if self.pending_buffer.remain() > 0: + self.pending_on_partial_copy(self.reset_pending) + else: + self.reset_and_notify_pending() + return 'done' else: - self.reset_and_notify_pending() - return 'done' + if self.pending_buffer.remain() == 0: + self.reset_and_notify_pending() + if buffer.remain() == 0: + return 'done' + else: + self.pending_buffer = buffer + self.pending_on_partial_copy = on_partial_copy + self.pending_on_copy_done = on_copy_done + return 'blocked' class StreamEnd(Waitable): stream: ReadableStream diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 8af438d6..8e504809 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -1457,8 +1457,19 @@ async def core_func1(task, args): assert(mem1[retp+0] == wsi) assert(mem1[retp+4] == 4) + [ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0) + assert(ret == definitions.BLOCKED) + fut4.set_result(None) + [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) + assert(event == EventCode.STREAM_WRITE) + assert(mem1[retp+0] == wsi) + assert(mem1[retp+4] == 0) + + [ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0) + assert(ret == 0) + [errctxi] = await canon_error_context_new(opts1, task, 0, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi) [] = await canon_waitable_set_drop(task, seti) @@ -1498,6 +1509,9 @@ async def core_func2(task, args): fut2.set_result(None) await task.on_block(fut3) + [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0) + assert(ret == 0) + mem2[0:8] = bytes(8) [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) assert(ret == 2) @@ -1508,9 +1522,19 @@ async def core_func2(task, args): await task.on_block(fut4) - [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) + [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0) + assert(ret == 0) + + [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0) + assert(ret == definitions.BLOCKED) + + [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) + assert(event == EventCode.STREAM_READ) + assert(mem2[retp+0] == rsi) + p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False) errctxi = 1 - assert(ret == (definitions.CLOSED | errctxi)) + assert(p2 == (definitions.CLOSED | errctxi)) + [] = await canon_stream_close_readable(U8Type(), task, rsi) [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0)