Skip to content

Commit

Permalink
Relax stream.{read,write} to allow lengths of 0
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 24, 2025
1 parent ecfd9bb commit 519b688
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
24 changes: 17 additions & 7 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class BufferGuestImpl(Buffer):
length: int

def __init__(self, t, cx, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
trap_if(length > Buffer.MAX_LENGTH)
if t:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
Expand Down Expand Up @@ -702,13 +702,23 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
return 'blocked'
else:
ncopy = min(src.remain(), dst.remain())
assert(ncopy > 0)
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
if ncopy > 0:
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
return 'done'
else:
self.reset_and_notify_pending()
return 'done'
if self.pending_buffer.remain() == 0:
self.reset_and_notify_pending()
if buffer.remain() == 0:
return 'done'
else:
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done
return 'blocked'

class StreamEnd(Waitable):
stream: ReadableStream
Expand Down
28 changes: 26 additions & 2 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,19 @@ async def core_func1(task, args):
assert(mem1[retp+0] == wsi)
assert(mem1[retp+4] == 4)

[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 0, 0)
assert(ret == definitions.BLOCKED)

fut4.set_result(None)

[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
assert(event == EventCode.STREAM_WRITE)
assert(mem1[retp+0] == wsi)
assert(mem1[retp+4] == 0)

[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 0, 0)
assert(ret == 0)

[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
[] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi)
[] = await canon_waitable_set_drop(task, seti)
Expand Down Expand Up @@ -1498,6 +1509,9 @@ async def core_func2(task, args):
fut2.set_result(None)
await task.on_block(fut3)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 0)
assert(ret == 0)

mem2[0:8] = bytes(8)
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
assert(ret == 2)
Expand All @@ -1508,9 +1522,19 @@ async def core_func2(task, args):

await task.on_block(fut4)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 0)
assert(ret == 0)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 0)
assert(ret == definitions.BLOCKED)

[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
assert(event == EventCode.STREAM_READ)
assert(mem2[retp+0] == rsi)
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
errctxi = 1
assert(ret == (definitions.CLOSED | errctxi))
assert(p2 == (definitions.CLOSED | errctxi))

[] = await canon_stream_close_readable(U8Type(), task, rsi)
[] = await canon_waitable_set_drop(task, seti)
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
Expand Down

0 comments on commit 519b688

Please sign in to comment.