Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi: improve stdin support for nonblocking, fix stdout #1542

Merged
merged 5 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 20 additions & 29 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (stdin subscribers).
var stdinSubs []*event
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
Expand Down Expand Up @@ -131,12 +131,16 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
if fd < 0 {
return syscall.EBADF
}
if fd == internalsys.FdStdin {
// if the fd is Stdin, do not ack yet,
// append to a slice for delayed evaluation.
stdinSubs = append(stdinSubs, evt)
if file, ok := fsc.LookupFile(fd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
} else {
evt.errno = processFDEventRead(fsc, fd)
writeEvent(outBuf, evt)
readySubs++
}
Expand All @@ -145,7 +149,11 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
if fd < 0 {
return syscall.EBADF
}
evt.errno = processFDEventWrite(fsc, fd)
if _, ok := fsc.LookupFile(fd); ok {
evt.errno = wasip1.ErrnoNotsup
} else {
evt.errno = wasip1.ErrnoBadf
}
readySubs++
writeEvent(outBuf, evt)
default:
Expand All @@ -159,8 +167,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
timeout = 0
}

// If there are stdin subscribers, check for data with given timeout.
if len(stdinSubs) > 0 {
// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return syscall.EBADF
Expand All @@ -172,9 +180,9 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) syscall.Er
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range stdinSubs {
for i := range blockingStdinSubs {
readySubs++
evt := stdinSubs[i]
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
Expand Down Expand Up @@ -223,23 +231,6 @@ func processClockEvent(inBuf []byte) (time.Duration, syscall.Errno) {
}
}

// processFDEventRead returns ErrnoSuccess if the file exists and ErrnoBadf otherwise.
func processFDEventRead(fsc *internalsys.FSContext, fd int32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoSuccess
} else {
return wasip1.ErrnoBadf
}
}

// processFDEventWrite returns ErrnoNotsup if the file exists and ErrnoBadf otherwise.
func processFDEventWrite(fsc *internalsys.FSContext, fd int32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoNotsup
}
return wasip1.ErrnoBadf
}

// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
Expand Down
21 changes: 21 additions & 0 deletions imports/wasi_snapshot_preview1/testdata/gotip/wasi.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"sync"
"syscall"
"time"
)

func main() {
Expand All @@ -24,7 +25,12 @@ func main() {
if err := mainNonblock(os.Args[2], os.Args[3:]); err != nil {
panic(err)
}
case "stdin":
if err := mainStdin(); err != nil {
panic(err)
}
}

}

// mainSock is an explicit test of a blocking socket.
Expand Down Expand Up @@ -160,3 +166,18 @@ func mainNonblock(mode string, files []string) error {
wg.Wait()
return nil
}

// Reproducer for https://github.com/tetratelabs/wazero/issues/1538
func mainStdin() error {
go func() {
time.Sleep(1 * time.Second)
os.Stdout.WriteString("waiting for stdin...\n")
}()

b, err := io.ReadAll(os.Stdin)
if err != nil {
return err
}
os.Stdout.Write(b)
return nil
}
36 changes: 36 additions & 0 deletions imports/wasi_snapshot_preview1/wasi_stdlib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/fs"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -457,3 +458,38 @@ func testHTTP(t *testing.T, bin []byte) {
console := <-ch
require.Equal(t, "", console)
}

func Test_Stdin(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Nonblocking stdio is not supported on wasip1+windows.")
}
toolchains := map[string][]byte{}
if wasmGotip != nil {
toolchains["gotip"] = wasmGotip
}

for toolchain, bin := range toolchains {
toolchain := toolchain
bin := bin
t.Run(toolchain, func(t *testing.T) {
testStdin(t, bin)
})
}
}

func testStdin(t *testing.T, bin []byte) {
r, w, err := os.Pipe()
require.NoError(t, err)
moduleConfig := wazero.NewModuleConfig().
WithSysWalltime().WithSysNanotime(). // HTTP middleware uses both clocks
WithArgs("wasi", "stdin").
WithStdin(r).WithStdout(os.Stdout)
ch := make(chan string, 1)
go func() {
ch <- compileAndRun(t, testCtx, moduleConfig, bin)
}()
time.Sleep(1 * time.Second)
_, _ = w.WriteString("foo")
s := <-ch
require.Equal(t, "waiting for stdin...\nfoo", s)
}