From 6f21e2ecf09e194ad39687e80d65afc52960cef2 Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Wed, 29 Jun 2022 23:30:47 +0200 Subject: [PATCH] Add polling and networking for target_os = "wasi" Based on https://github.com/tokio-rs/mio/pull/1395 And with * https://github.com/bytecodealliance/wasmtime/pull/3711 * https://github.com/rust-lang/rust/pull/93158 merged, mio can have limited support for networking for the `wasm32-wasi` target. Co-authored-by: Thomas de Zeeuw Signed-off-by: Harald Hoyer --- Cargo.toml | 4 + src/io_source.rs | 40 +++++ src/lib.rs | 2 + src/net/mod.rs | 2 + src/net/tcp/listener.rs | 31 ++++ src/net/tcp/stream.rs | 31 ++++ src/poll.rs | 4 +- src/sys/mod.rs | 6 + src/sys/shell/mod.rs | 2 + src/sys/shell/selector.rs | 22 ++- src/sys/shell/tcp.rs | 4 + src/sys/shell/udp.rs | 1 + src/sys/wasi/mod.rs | 356 ++++++++++++++++++++++++++++++++++++++ 13 files changed, 503 insertions(+), 2 deletions(-) create mode 100644 src/sys/wasi/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 262ab6ba1..4f9cf7c1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,10 @@ miow = "0.3.6" winapi = { version = "0.3", features = ["winsock2", "mswsock"] } ntapi = "0.3" +[target.'cfg(target_os = "wasi")'.dependencies] +wasi = "0.11.0" +libc = "0.2.86" + [dev-dependencies] env_logger = { version = "0.8.4", default-features = false } rand = "0.8" diff --git a/src/io_source.rs b/src/io_source.rs index 3ec7276f1..eebdeaa50 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -1,6 +1,8 @@ use std::ops::{Deref, DerefMut}; #[cfg(unix)] use std::os::unix::io::AsRawFd; +#[cfg(target_os = "wasi")] +use std::os::wasi::io::AsRawFd; #[cfg(windows)] use std::os::windows::io::AsRawSocket; #[cfg(target_os = "hermit")] @@ -204,6 +206,44 @@ where } } +#[cfg(target_os = "wasi")] +impl event::Source for IoSource +where + T: AsRawFd, +{ + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.associate(registry)?; + registry + .selector() + .register(self.inner.as_raw_fd() as _, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.check_association(registry)?; + registry + .selector() + .reregister(self.inner.as_raw_fd() as _, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + #[cfg(debug_assertions)] + self.selector_id.remove_association(registry)?; + registry.selector().deregister(self.inner.as_raw_fd() as _) + } +} + #[cfg(target_os = "hermit")] impl event::Source for IoSource where diff --git a/src/lib.rs b/src/lib.rs index 2dc1ca6ad..85b9cdce1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,7 @@ mod interest; mod poll; mod sys; mod token; +#[cfg(not(target_os = "wasi"))] mod waker; pub mod event; @@ -69,6 +70,7 @@ pub use event::Events; pub use interest::Interest; pub use poll::{Poll, Registry}; pub use token::Token; +#[cfg(not(target_os = "wasi"))] pub use waker::Waker; #[cfg(all(unix, feature = "os-ext"))] diff --git a/src/net/mod.rs b/src/net/mod.rs index c8cef17e9..7d714ca00 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -28,7 +28,9 @@ mod tcp; pub use self::tcp::{TcpListener, TcpStream}; +#[cfg(not(target_os = "wasi"))] mod udp; +#[cfg(not(target_os = "wasi"))] pub use self::udp::UdpSocket; #[cfg(unix)] diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 9b1eaa40c..ef0823246 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,6 +1,8 @@ use std::net::{self, SocketAddr}; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +#[cfg(target_os = "wasi")] +use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; #[cfg(windows)] use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; #[cfg(target_os = "hermit")] @@ -13,6 +15,7 @@ use crate::io_source::IoSource; use crate::net::TcpStream; #[cfg(unix)] use crate::sys::tcp::set_reuseaddr; +#[cfg(not(target_os = "wasi"))] use crate::sys::tcp::{bind, listen, new_for_addr}; use crate::{event, sys, Interest, Registry, Token}; @@ -54,6 +57,7 @@ impl TcpListener { /// 2. Set the `SO_REUSEADDR` option on the socket on Unix. /// 3. Bind the socket to the specified address. /// 4. Calls `listen` on the socket to prepare it to receive new connections. + #[cfg(not(target_os = "wasi"))] pub fn bind(addr: SocketAddr) -> io::Result { let socket = new_for_addr(addr)?; #[cfg(unix)] @@ -243,3 +247,30 @@ impl FromAbi for TcpListener { TcpListener::from_std(FromAbi::from_abi(socket)) } } + +#[cfg(target_os = "wasi")] +impl IntoRawFd for TcpListener { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +#[cfg(target_os = "wasi")] +impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(target_os = "wasi")] +impl FromRawFd for TcpListener { + /// Converts a `RawFd` to a `TcpListener`. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socket is in + /// non-blocking mode. + unsafe fn from_raw_fd(fd: RawFd) -> TcpListener { + TcpListener::from_std(FromRawFd::from_raw_fd(fd)) + } +} diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 447eef815..1ca64cda6 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -3,6 +3,8 @@ use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::net::{self, Shutdown, SocketAddr}; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +#[cfg(target_os = "wasi")] +use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; #[cfg(windows)] use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; #[cfg(target_os = "hermit")] @@ -11,6 +13,7 @@ use std::os::hermit::abi; use std::os::hermit::io::{AsAbi, FromAbi, IntoAbi}; use crate::io_source::IoSource; +#[cfg(not(target_os = "wasi"))] use crate::sys::tcp::{connect, new_for_addr}; use crate::{event, Interest, Registry, Token}; @@ -77,6 +80,7 @@ impl TcpStream { /// 5. Now the stream can be used. /// /// [read interest]: Interest::READABLE + #[cfg(not(target_os = "wasi"))] pub fn connect(addr: SocketAddr) -> io::Result { let socket = new_for_addr(addr)?; #[cfg(unix)] @@ -362,3 +366,30 @@ impl FromAbi for TcpStream { TcpStream::from_std(FromAbi::from_abi(socket)) } } + +#[cfg(target_os = "wasi")] +impl IntoRawFd for TcpStream { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +#[cfg(target_os = "wasi")] +impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(target_os = "wasi")] +impl FromRawFd for TcpStream { + /// Converts a `RawFd` to a `TcpStream`. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socket is in + /// non-blocking mode. + unsafe fn from_raw_fd(fd: RawFd) -> TcpStream { + TcpStream::from_std(FromRawFd::from_raw_fd(fd)) + } +} diff --git a/src/poll.rs b/src/poll.rs index eb89ea6d9..2e0f3e3d9 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -635,6 +635,7 @@ impl Registry { /// /// Event sources registered with this `Registry` will be registered with /// the original `Registry` and `Poll` instance. + #[cfg(not(target_os = "wasi"))] pub fn try_clone(&self) -> io::Result { self.selector .try_clone() @@ -643,7 +644,7 @@ impl Registry { /// Internal check to ensure only a single `Waker` is active per [`Poll`] /// instance. - #[cfg(debug_assertions)] + #[cfg(all(debug_assertions, not(target_os = "wasi")))] pub(crate) fn register_waker(&self) { assert!( !self.selector.register_waker(), @@ -652,6 +653,7 @@ impl Registry { } /// Get access to the `sys::Selector`. + #[cfg(any(not(target_os = "wasi"), feature = "net"))] pub(crate) fn selector(&self) -> &sys::Selector { &self.selector } diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 6cc647ffe..7cbbc0e91 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -70,6 +70,12 @@ cfg_os_poll! { pub use self::windows::*; } +#[cfg(target_os = "wasi")] +cfg_os_poll! { + mod wasi; + pub(crate) use self::wasi::*; +} + cfg_not_os_poll! { mod shell; pub(crate) use self::shell::*; diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 7e1533f45..8a3175f76 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -7,7 +7,9 @@ macro_rules! os_required { mod selector; pub(crate) use self::selector::{event, Event, Events, Selector}; +#[cfg(not(target_os = "wasi"))] mod waker; +#[cfg(not(target_os = "wasi"))] pub(crate) use self::waker::Waker; cfg_net! { diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs index 91fc0bf47..032631f3a 100644 --- a/src/sys/shell/selector.rs +++ b/src/sys/shell/selector.rs @@ -11,6 +11,7 @@ pub type Events = Vec; pub struct Selector {} impl Selector { + #[cfg(not(target_os = "wasi"))] pub fn try_clone(&self) -> io::Result { os_required!(); } @@ -19,7 +20,7 @@ impl Selector { os_required!(); } - #[cfg(debug_assertions)] + #[cfg(all(debug_assertions, not(target_os = "wasi")))] pub fn register_waker(&self) -> bool { os_required!(); } @@ -44,6 +45,25 @@ cfg_any_os_ext! { } } +#[cfg(target_os = "wasi")] +cfg_any_os_ext! { + use crate::{Interest, Token}; + + impl Selector { + pub fn register(&self, _: wasi::Fd, _: Token, _: Interest) -> io::Result<()> { + os_required!(); + } + + pub fn reregister(&self, _: wasi::Fd, _: Token, _: Interest) -> io::Result<()> { + os_required!(); + } + + pub fn deregister(&self, _: wasi::Fd) -> io::Result<()> { + os_required!(); + } + } +} + cfg_io_source! { #[cfg(debug_assertions)] impl Selector { diff --git a/src/sys/shell/tcp.rs b/src/sys/shell/tcp.rs index 60dfe70f6..260763aeb 100644 --- a/src/sys/shell/tcp.rs +++ b/src/sys/shell/tcp.rs @@ -1,18 +1,22 @@ use std::io; use std::net::{self, SocketAddr}; +#[cfg(not(target_os = "wasi"))] pub(crate) fn new_for_addr(_: SocketAddr) -> io::Result { os_required!(); } +#[cfg(not(target_os = "wasi"))] pub(crate) fn bind(_: &net::TcpListener, _: SocketAddr) -> io::Result<()> { os_required!(); } +#[cfg(not(target_os = "wasi"))] pub(crate) fn connect(_: &net::TcpStream, _: SocketAddr) -> io::Result<()> { os_required!(); } +#[cfg(not(target_os = "wasi"))] pub(crate) fn listen(_: &net::TcpListener, _: u32) -> io::Result<()> { os_required!(); } diff --git a/src/sys/shell/udp.rs b/src/sys/shell/udp.rs index 48ccac740..6a48b6941 100644 --- a/src/sys/shell/udp.rs +++ b/src/sys/shell/udp.rs @@ -1,3 +1,4 @@ +#![cfg(not(target_os = "wasi"))] use std::io; use std::net::{self, SocketAddr}; diff --git a/src/sys/wasi/mod.rs b/src/sys/wasi/mod.rs new file mode 100644 index 000000000..f6a2aa02b --- /dev/null +++ b/src/sys/wasi/mod.rs @@ -0,0 +1,356 @@ +//! # Notes +//! +//! The current implementation is somewhat limited. The `Waker` is not +//! implemented, as at the time of writing there is no way to support to wake-up +//! a thread from calling `poll_oneoff`. +//! +//! Furthermore the (re/de)register functions also don't work while concurrently +//! polling as both registering and polling requires a lock on the +//! `subscriptions`. +//! +//! Finally `Selector::try_clone`, required by `Registry::try_clone`, doesn't +//! work. However this could be implemented by use of an `Arc`. +//! +//! In summary, this only (barely) works using a single thread. + +use std::cmp::min; +use std::io; +#[cfg(all(feature = "net", debug_assertions))] +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +#[cfg(feature = "net")] +use crate::{Interest, Token}; + +cfg_net! { + pub(crate) mod tcp { + use std::io; + use std::net::{self, SocketAddr}; + + pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { + let (stream, addr) = listener.accept()?; + stream.set_nonblocking(true)?; + Ok((stream, addr)) + } + } +} + +/// Unique id for use as `SelectorId`. +#[cfg(all(debug_assertions, feature = "net"))] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +pub(crate) struct Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: usize, + /// Subscriptions (reads events) we're interested in. + subscriptions: Arc>>, +} + +impl Selector { + pub(crate) fn new() -> io::Result { + Ok(Selector { + #[cfg(all(debug_assertions, feature = "net"))] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + subscriptions: Arc::new(Mutex::new(Vec::new())), + }) + } + + #[cfg(all(debug_assertions, feature = "net"))] + pub(crate) fn id(&self) -> usize { + self.id + } + + pub(crate) fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + events.clear(); + + let mut subscriptions = self.subscriptions.lock().unwrap(); + + // If we want to a use a timeout in the `wasi_poll_oneoff()` function + // we need another subscription to the list. + if let Some(timeout) = timeout { + subscriptions.push(timeout_subscription(timeout)); + } + + // `poll_oneoff` needs the same number of events as subscriptions. + let length = subscriptions.len(); + events.reserve(length); + + debug_assert!(events.capacity() >= length); + + let res = unsafe { wasi::poll_oneoff(subscriptions.as_ptr(), events.as_mut_ptr(), length) }; + + // Remove the timeout subscription we possibly added above. + if timeout.is_some() { + let timeout_sub = subscriptions.pop(); + debug_assert_eq!( + timeout_sub.unwrap().u.tag, + wasi::EVENTTYPE_CLOCK.raw(), + "failed to remove timeout subscription" + ); + } + + drop(subscriptions); // Unlock. + + match res { + Ok(n_events) => { + // Safety: `poll_oneoff` initialises the `events` for us. + unsafe { events.set_len(n_events) }; + + // Remove the timeout event. + if timeout.is_some() { + if let Some(index) = events.iter().position(is_timeout_event) { + events.swap_remove(index); + } + } + + check_errors(&events) + } + Err(err) => Err(io_err(err)), + } + } + + #[cfg(feature = "net")] + pub(crate) fn register( + &self, + fd: wasi::Fd, + token: Token, + interests: Interest, + ) -> io::Result<()> { + let mut subscriptions = self.subscriptions.lock().unwrap(); + + if interests.is_writable() { + let subscription = wasi::Subscription { + userdata: token.0 as wasi::Userdata, + u: wasi::SubscriptionU { + tag: wasi::EVENTTYPE_FD_WRITE.raw(), + u: wasi::SubscriptionUU { + fd_write: wasi::SubscriptionFdReadwrite { + file_descriptor: fd, + }, + }, + }, + }; + subscriptions.push(subscription); + } + + if interests.is_readable() { + let subscription = wasi::Subscription { + userdata: token.0 as wasi::Userdata, + u: wasi::SubscriptionU { + tag: wasi::EVENTTYPE_FD_READ.raw(), + u: wasi::SubscriptionUU { + fd_read: wasi::SubscriptionFdReadwrite { + file_descriptor: fd, + }, + }, + }, + }; + subscriptions.push(subscription); + } + + Ok(()) + } + + #[cfg(feature = "net")] + pub(crate) fn reregister( + &self, + fd: wasi::Fd, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.deregister(fd) + .and_then(|()| self.register(fd, token, interests)) + } + + #[cfg(feature = "net")] + pub(crate) fn deregister(&self, fd: wasi::Fd) -> io::Result<()> { + let mut subscriptions = self.subscriptions.lock().unwrap(); + + let predicate = |subscription: &wasi::Subscription| { + // Safety: `subscription.u.tag` defines the type of the union in + // `subscription.u.u`. + match subscription.u.tag { + t if t == wasi::EVENTTYPE_FD_WRITE.raw() => unsafe { + subscription.u.u.fd_write.file_descriptor == fd + }, + t if t == wasi::EVENTTYPE_FD_READ.raw() => unsafe { + subscription.u.u.fd_read.file_descriptor == fd + }, + _ => false, + } + }; + + let mut ret = Err(io::ErrorKind::NotFound.into()); + + while let Some(index) = subscriptions.iter().position(predicate) { + subscriptions.swap_remove(index); + ret = Ok(()) + } + + ret + } +} + +/// Token used to a add a timeout subscription, also used in removing it again. +const TIMEOUT_TOKEN: wasi::Userdata = wasi::Userdata::max_value(); + +/// Returns a `wasi::Subscription` for `timeout`. +fn timeout_subscription(timeout: Duration) -> wasi::Subscription { + wasi::Subscription { + userdata: TIMEOUT_TOKEN, + u: wasi::SubscriptionU { + tag: wasi::EVENTTYPE_CLOCK.raw(), + u: wasi::SubscriptionUU { + clock: wasi::SubscriptionClock { + id: wasi::CLOCKID_MONOTONIC, + // Timestamp is in nanoseconds. + timeout: min(wasi::Timestamp::MAX as u128, timeout.as_nanos()) + as wasi::Timestamp, + // Give the implementation another millisecond to coalesce + // events. + precision: Duration::from_millis(1).as_nanos() as wasi::Timestamp, + // Zero means the `timeout` is considered relative to the + // current time. + flags: 0, + }, + }, + }, + } +} + +fn is_timeout_event(event: &wasi::Event) -> bool { + event.type_ == wasi::EVENTTYPE_CLOCK && event.userdata == TIMEOUT_TOKEN +} + +/// Check all events for possible errors, it returns the first error found. +fn check_errors(events: &[Event]) -> io::Result<()> { + for event in events { + if event.error != wasi::ERRNO_SUCCESS { + return Err(io_err(event.error)); + } + } + Ok(()) +} + +/// Convert `wasi::Errno` into an `io::Error`. +fn io_err(errno: wasi::Errno) -> io::Error { + // TODO: check if this is valid. + io::Error::from_raw_os_error(errno.raw() as i32) +} + +pub(crate) type Events = Vec; + +pub(crate) type Event = wasi::Event; + +pub(crate) mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + + pub(crate) fn token(event: &Event) -> Token { + Token(event.userdata as usize) + } + + pub(crate) fn is_readable(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_READ + } + + pub(crate) fn is_writable(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_WRITE + } + + pub(crate) fn is_error(_: &Event) -> bool { + // Not supported? It could be that `wasi::Event.error` could be used for + // this, but the docs say `error that occurred while processing the + // subscription request`, so it's checked in `Select::select` already. + false + } + + pub(crate) fn is_read_closed(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_READ + // Safety: checked the type of the union above. + && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 + } + + pub(crate) fn is_write_closed(event: &Event) -> bool { + event.type_ == wasi::EVENTTYPE_FD_WRITE + // Safety: checked the type of the union above. + && (event.fd_readwrite.flags & wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP) != 0 + } + + pub(crate) fn is_priority(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn is_aio(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub(crate) fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + debug_detail!( + TypeDetails(wasi::Eventtype), + PartialEq::eq, + wasi::EVENTTYPE_CLOCK, + wasi::EVENTTYPE_FD_READ, + wasi::EVENTTYPE_FD_WRITE, + ); + + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_flag(got: &wasi::Eventrwflags, want: &wasi::Eventrwflags) -> bool { + (got & want) != 0 + } + debug_detail!( + EventrwflagsDetails(wasi::Eventrwflags), + check_flag, + wasi::EVENTRWFLAGS_FD_READWRITE_HANGUP, + ); + + struct EventFdReadwriteDetails(wasi::EventFdReadwrite); + + impl fmt::Debug for EventFdReadwriteDetails { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EventFdReadwrite") + .field("nbytes", &self.0.nbytes) + .field("flags", &self.0.flags) + .finish() + } + } + + f.debug_struct("Event") + .field("userdata", &event.userdata) + .field("error", &event.error) + .field("type", &TypeDetails(event.type_)) + .field("fd_readwrite", &EventFdReadwriteDetails(event.fd_readwrite)) + .finish() + } +} + +cfg_os_poll! { + cfg_io_source! { + pub(crate) struct IoSourceState; + + impl IoSourceState { + pub(crate) fn new() -> IoSourceState { + IoSourceState + } + + pub(crate) fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } + } + } +}