Skip to content

Commit

Permalink
wasi: improve stdin support for nonblocking, fix stdout (#1542)
Browse files Browse the repository at this point in the history
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
  • Loading branch information
evacchi authored Jun 28, 2023
1 parent df42824 commit 39f2ff2
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 35 deletions.
12 changes: 6 additions & 6 deletions imports/wasi_snapshot_preview1/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,41 +423,41 @@ func Test_fdFdstatGet_StdioNonblock(t *testing.T) {
fd: sys.FdStdin,
expectedMemory: []byte{
0, 0, // fs_filetype
4, 0, 0, 0, 0, 0, // fs_flags
5, 0, 0, 0, 0, 0, // fs_flags
0xff, 0x1, 0xe0, 0x8, 0x0, 0x0, 0x0, 0x0, // fs_rights_base
0, 0, 0, 0, 0, 0, 0, 0, // fs_rights_inheriting
},
expectedLog: `
==> wasi_snapshot_preview1.fd_fdstat_get(fd=0)
<== (stat={filetype=UNKNOWN,fdflags=NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
<== (stat={filetype=UNKNOWN,fdflags=APPEND|NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
`,
},
{
name: "stdout",
fd: sys.FdStdout,
expectedMemory: []byte{
0, 0, // fs_filetype
4, 0, 0, 0, 0, 0, // fs_flags
5, 0, 0, 0, 0, 0, // fs_flags
0xff, 0x1, 0xe0, 0x8, 0x0, 0x0, 0x0, 0x0, // fs_rights_base
0, 0, 0, 0, 0, 0, 0, 0, // fs_rights_inheriting
},
expectedLog: `
==> wasi_snapshot_preview1.fd_fdstat_get(fd=1)
<== (stat={filetype=UNKNOWN,fdflags=NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
<== (stat={filetype=UNKNOWN,fdflags=APPEND|NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
`,
},
{
name: "stderr",
fd: sys.FdStderr,
expectedMemory: []byte{
0, 0, // fs_filetype
4, 0, 0, 0, 0, 0, // fs_flags
5, 0, 0, 0, 0, 0, // fs_flags
0xff, 0x1, 0xe0, 0x8, 0x0, 0x0, 0x0, 0x0, // fs_rights_base
0, 0, 0, 0, 0, 0, 0, 0, // fs_rights_inheriting
},
expectedLog: `
==> wasi_snapshot_preview1.fd_fdstat_get(fd=2)
<== (stat={filetype=UNKNOWN,fdflags=NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
<== (stat={filetype=UNKNOWN,fdflags=APPEND|NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
`,
},
}
Expand Down
49 changes: 20 additions & 29 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (stdin subscribers).
var stdinSubs []*event
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
Expand Down Expand Up @@ -131,12 +131,16 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
if fd < 0 {
return syscall.EBADF
}
if fd == internalsys.FdStdin {
// if the fd is Stdin, do not ack yet,
// append to a slice for delayed evaluation.
stdinSubs = append(stdinSubs, evt)
if file, ok := fsc.LookupFile(fd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
} else {
evt.errno = processFDEventRead(fsc, fd)
writeEvent(outBuf, evt)
readySubs++
}
Expand All @@ -145,7 +149,11 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
if fd < 0 {
return syscall.EBADF
}
evt.errno = processFDEventWrite(fsc, fd)
if _, ok := fsc.LookupFile(fd); ok {
evt.errno = wasip1.ErrnoNotsup
} else {
evt.errno = wasip1.ErrnoBadf
}
readySubs++
writeEvent(outBuf, evt)
default:
Expand All @@ -159,8 +167,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
timeout = 0
}

// If there are stdin subscribers, check for data with given timeout.
if len(stdinSubs) > 0 {
// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return syscall.EBADF
Expand All @@ -172,9 +180,9 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range stdinSubs {
for i := range blockingStdinSubs {
readySubs++
evt := stdinSubs[i]
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
Expand Down Expand Up @@ -223,23 +231,6 @@ func processClockEvent(inBuf []byte) (time.Duration, syscall.Errno) {
}
}

// processFDEventRead returns ErrnoSuccess if the file exists and ErrnoBadf otherwise.
func processFDEventRead(fsc *internalsys.FSContext, fd int32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoSuccess
} else {
return wasip1.ErrnoBadf
}
}

// processFDEventWrite returns ErrnoNotsup if the file exists and ErrnoBadf otherwise.
func processFDEventWrite(fsc *internalsys.FSContext, fd int32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoNotsup
}
return wasip1.ErrnoBadf
}

// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
Expand Down
27 changes: 27 additions & 0 deletions imports/wasi_snapshot_preview1/testdata/gotip/wasi.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"sync"
"syscall"
"time"
)

func main() {
Expand All @@ -24,7 +25,14 @@ func main() {
if err := mainNonblock(os.Args[2], os.Args[3:]); err != nil {
panic(err)
}
case "stdin":
if err := mainStdin(); err != nil {
panic(err)
}
case "stdout":
mainStdout()
}

}

// mainSock is an explicit test of a blocking socket.
Expand Down Expand Up @@ -160,3 +168,22 @@ func mainNonblock(mode string, files []string) error {
wg.Wait()
return nil
}

// Reproducer for https://github.com/tetratelabs/wazero/issues/1538
func mainStdin() error {
go func() {
time.Sleep(1 * time.Second)
os.Stdout.WriteString("waiting for stdin...\n")
}()

b, err := io.ReadAll(os.Stdin)
if err != nil {
return err
}
os.Stdout.Write(b)
return nil
}

func mainStdout() {
os.Stdout.WriteString("test")
}
Binary file not shown.
33 changes: 33 additions & 0 deletions imports/wasi_snapshot_preview1/wasi_stdlib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/fs"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -457,3 +458,35 @@ func testHTTP(t *testing.T, bin []byte) {
console := <-ch
require.Equal(t, "", console)
}

func Test_Stdin(t *testing.T) {
toolchains := map[string][]byte{}
if wasmGotip != nil {
toolchains["gotip"] = wasmGotip
}

for toolchain, bin := range toolchains {
toolchain := toolchain
bin := bin
t.Run(toolchain, func(t *testing.T) {
testStdin(t, bin)
})
}
}

func testStdin(t *testing.T, bin []byte) {
r, w, err := os.Pipe()
require.NoError(t, err)
moduleConfig := wazero.NewModuleConfig().
WithSysNanotime(). // poll_oneoff requires nanotime.
WithArgs("wasi", "stdin").
WithStdin(r)
ch := make(chan string, 1)
go func() {
ch <- compileAndRun(t, testCtx, moduleConfig, bin)
}()
time.Sleep(1 * time.Second)
_, _ = w.WriteString("foo")
s := <-ch
require.Equal(t, "waiting for stdin...\nfoo", s)
}
11 changes: 11 additions & 0 deletions internal/sysfs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ type stdioFile struct {
st fsapi.Stat_t
}

// SetAppend implements File.SetAppend
func (f *stdioFile) SetAppend(bool) syscall.Errno {
// Ignore for stdio.
return 0
}

// IsAppend implements File.SetAppend
func (f *stdioFile) IsAppend() bool {
return true
}

// IsDir implements File.IsDir
func (f *stdioFile) IsDir() (bool, syscall.Errno) {
return false, 0
Expand Down
10 changes: 10 additions & 0 deletions internal/sysfs/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ func TestFileSetAppend(t *testing.T) {
requireFileContent("wazero6789wazero")
}

func TestStdioFile_SetAppend(t *testing.T) {
// SetAppend should not affect Stdio.
file, err := NewStdioFile(false, os.Stdout)
require.NoError(t, err)
errno := file.SetAppend(true)
require.EqualErrno(t, 0, errno)
_, errno = file.Write([]byte{})
require.EqualErrno(t, 0, errno)
}

func TestFileIno(t *testing.T) {
tmpDir := t.TempDir()
dirFS, embedFS, mapFS := dirEmbedMapFS(t, tmpDir)
Expand Down

0 comments on commit 39f2ff2

Please sign in to comment.