Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi: improve stdin support for nonblocking, fix stdout #1542

Merged
merged 5 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions imports/wasi_snapshot_preview1/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,41 +423,41 @@ func Test_fdFdstatGet_StdioNonblock(t *testing.T) {
fd: sys.FdStdin,
expectedMemory: []byte{
0, 0, // fs_filetype
4, 0, 0, 0, 0, 0, // fs_flags
5, 0, 0, 0, 0, 0, // fs_flags
0xff, 0x1, 0xe0, 0x8, 0x0, 0x0, 0x0, 0x0, // fs_rights_base
0, 0, 0, 0, 0, 0, 0, 0, // fs_rights_inheriting
},
expectedLog: `
==> wasi_snapshot_preview1.fd_fdstat_get(fd=0)
<== (stat={filetype=UNKNOWN,fdflags=NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
<== (stat={filetype=UNKNOWN,fdflags=APPEND|NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
`,
},
{
name: "stdout",
fd: sys.FdStdout,
expectedMemory: []byte{
0, 0, // fs_filetype
4, 0, 0, 0, 0, 0, // fs_flags
5, 0, 0, 0, 0, 0, // fs_flags
0xff, 0x1, 0xe0, 0x8, 0x0, 0x0, 0x0, 0x0, // fs_rights_base
0, 0, 0, 0, 0, 0, 0, 0, // fs_rights_inheriting
},
expectedLog: `
==> wasi_snapshot_preview1.fd_fdstat_get(fd=1)
<== (stat={filetype=UNKNOWN,fdflags=NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
<== (stat={filetype=UNKNOWN,fdflags=APPEND|NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
`,
},
{
name: "stderr",
fd: sys.FdStderr,
expectedMemory: []byte{
0, 0, // fs_filetype
4, 0, 0, 0, 0, 0, // fs_flags
5, 0, 0, 0, 0, 0, // fs_flags
0xff, 0x1, 0xe0, 0x8, 0x0, 0x0, 0x0, 0x0, // fs_rights_base
0, 0, 0, 0, 0, 0, 0, 0, // fs_rights_inheriting
},
expectedLog: `
==> wasi_snapshot_preview1.fd_fdstat_get(fd=2)
<== (stat={filetype=UNKNOWN,fdflags=NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
<== (stat={filetype=UNKNOWN,fdflags=APPEND|NONBLOCK,fs_rights_base=FD_DATASYNC|FD_READ|FD_SEEK|FDSTAT_SET_FLAGS|FD_SYNC|FD_TELL|FD_WRITE|FD_ADVISE|FD_ALLOCATE,fs_rights_inheriting=},errno=ESUCCESS)
`,
},
}
Expand Down
49 changes: 20 additions & 29 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (stdin subscribers).
var stdinSubs []*event
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
Expand Down Expand Up @@ -131,12 +131,16 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
if fd < 0 {
return syscall.EBADF
}
if fd == internalsys.FdStdin {
// if the fd is Stdin, do not ack yet,
// append to a slice for delayed evaluation.
stdinSubs = append(stdinSubs, evt)
if file, ok := fsc.LookupFile(fd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
} else {
evt.errno = processFDEventRead(fsc, fd)
writeEvent(outBuf, evt)
readySubs++
}
Expand All @@ -145,7 +149,11 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
if fd < 0 {
return syscall.EBADF
}
evt.errno = processFDEventWrite(fsc, fd)
if _, ok := fsc.LookupFile(fd); ok {
evt.errno = wasip1.ErrnoNotsup
} else {
evt.errno = wasip1.ErrnoBadf
}
readySubs++
writeEvent(outBuf, evt)
default:
Expand All @@ -159,8 +167,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
timeout = 0
}

// If there are stdin subscribers, check for data with given timeout.
if len(stdinSubs) > 0 {
// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return syscall.EBADF
Expand All @@ -172,9 +180,9 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range stdinSubs {
for i := range blockingStdinSubs {
readySubs++
evt := stdinSubs[i]
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
Expand Down Expand Up @@ -223,23 +231,6 @@ func processClockEvent(inBuf []byte) (time.Duration, syscall.Errno) {
}
}

// processFDEventRead returns ErrnoSuccess if the file exists and ErrnoBadf otherwise.
func processFDEventRead(fsc *internalsys.FSContext, fd int32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoSuccess
} else {
return wasip1.ErrnoBadf
}
}

// processFDEventWrite returns ErrnoNotsup if the file exists and ErrnoBadf otherwise.
func processFDEventWrite(fsc *internalsys.FSContext, fd int32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoNotsup
}
return wasip1.ErrnoBadf
}

// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
Expand Down
27 changes: 27 additions & 0 deletions imports/wasi_snapshot_preview1/testdata/gotip/wasi.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"sync"
"syscall"
"time"
)

func main() {
Expand All @@ -24,7 +25,14 @@ func main() {
if err := mainNonblock(os.Args[2], os.Args[3:]); err != nil {
panic(err)
}
case "stdin":
if err := mainStdin(); err != nil {
panic(err)
}
case "stdout":
mainStdout()
}

}

// mainSock is an explicit test of a blocking socket.
Expand Down Expand Up @@ -160,3 +168,22 @@ func mainNonblock(mode string, files []string) error {
wg.Wait()
return nil
}

// Reproducer for https://github.com/tetratelabs/wazero/issues/1538
func mainStdin() error {
go func() {
time.Sleep(1 * time.Second)
os.Stdout.WriteString("waiting for stdin...\n")
}()

b, err := io.ReadAll(os.Stdin)
if err != nil {
return err
}
os.Stdout.Write(b)
return nil
}

func mainStdout() {
os.Stdout.WriteString("test")
}
Binary file not shown.
33 changes: 33 additions & 0 deletions imports/wasi_snapshot_preview1/wasi_stdlib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/fs"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -457,3 +458,35 @@ func testHTTP(t *testing.T, bin []byte) {
console := <-ch
require.Equal(t, "", console)
}

func Test_Stdin(t *testing.T) {
toolchains := map[string][]byte{}
if wasmGotip != nil {
toolchains["gotip"] = wasmGotip
}

for toolchain, bin := range toolchains {
toolchain := toolchain
bin := bin
t.Run(toolchain, func(t *testing.T) {
testStdin(t, bin)
})
}
}

func testStdin(t *testing.T, bin []byte) {
r, w, err := os.Pipe()
require.NoError(t, err)
moduleConfig := wazero.NewModuleConfig().
WithSysNanotime(). // poll_oneoff requires nanotime.
WithArgs("wasi", "stdin").
WithStdin(r)
ch := make(chan string, 1)
go func() {
ch <- compileAndRun(t, testCtx, moduleConfig, bin)
}()
time.Sleep(1 * time.Second)
_, _ = w.WriteString("foo")
s := <-ch
require.Equal(t, "waiting for stdin...\nfoo", s)
}
11 changes: 11 additions & 0 deletions internal/sysfs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ type stdioFile struct {
st fsapi.Stat_t
}

// SetAppend implements File.SetAppend
func (f *stdioFile) SetAppend(bool) syscall.Errno {
// Ignore for stdio.
return 0
}

// IsAppend implements File.SetAppend
func (f *stdioFile) IsAppend() bool {
return true
}

// IsDir implements File.IsDir
func (f *stdioFile) IsDir() (bool, syscall.Errno) {
return false, 0
Expand Down
10 changes: 10 additions & 0 deletions internal/sysfs/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ func TestFileSetAppend(t *testing.T) {
requireFileContent("wazero6789wazero")
}

func TestStdioFile_SetAppend(t *testing.T) {
// SetAppend should not affect Stdio.
file, err := NewStdioFile(false, os.Stdout)
require.NoError(t, err)
errno := file.SetAppend(true)
require.EqualErrno(t, 0, errno)
_, errno = file.Write([]byte{})
require.EqualErrno(t, 0, errno)
}

func TestFileIno(t *testing.T) {
tmpDir := t.TempDir()
dirFS, embedFS, mapFS := dirEmbedMapFS(t, tmpDir)
Expand Down