Skip to content

Commit

Permalink
lightway-server: Use i/o uring for all i/o, not just tun.
Browse files Browse the repository at this point in the history
This does not consistently improve performance but reduces CPU overheads (by
around 50%-100% i.e. half to one core) under heavy traffic, which adding
perhaps a few hundred Mbps to a speedtest.net download test and making
negligible difference to the upload test. It also removes about 1ms from the
latency in the same tests. Finally the STDEV across multiple test runs appears
to be lower.

This appears to be due to a combination of avoiding async runtime overheads, as
well as removing various channels/queues in favour of a more direct model of
interaction between the ring and the connections.

As well as those benefits we are now able to reach the same level of
performance with far fewer slots used for the TUN rx path, here we use 64 slots
(by default) and reach the same performance as using 1024 previously. The way
uring handles blocking vs async for tun devices seems to be non-optimal. In
blocking mode things are very slow. In async mode more and more time is spent
on bookkeeping and polling, as the number of slots is increased, plus a high
level of EAGAIN results (due to a request timing out after multiple failed
polls[^0]) which waste time requeueing. This is related to
axboe/liburing#886 and
axboe/liburing#239.

For UDP/TCP sockets io uring behaves well with the socket in blocking mode
which avoids processing lots of EAGAIN results.

Tuning the slots for each I/O path is a bit of an art (more is definitely not
always better) and the sweet spot varies depending on the I/O device, so
provide various tunables instead of just splitting the ring evenly. With this
there's no real reason to have a very large ring, it's the number of inflight
requests which matters.

This is specific to the server since it relies on kernel features and
correctness(/lack of bugs) which may not be upheld on an arbitrary client
system (while it is assumed that server operators have more control over what
they run). It is also not portable to non-Linux systems. It is known to work
with Linux 6.1 (as found in Debian 12 AKA bookworm).

Note that this kernel version contains a bug which causes the `iou-sqp-*`
kernel thread to get stuck (unkillable) if the tun is in blocking mode,
therefore an option is provided. Enabling that option on a kernel which
contains [the fix][] allows equivalent performance with fewer slots on the
ring.

[^0]: When data becomes available _all_ requests are woken but only one will
      find data, the rest will see EAGAIN and after a certain number of such
      events I/O uring will propagate this back to userspace.
[the fix]: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=438b406055cd21105aad77db7938ee4720b09bee
  • Loading branch information
xv-ian-c committed Dec 5, 2024
1 parent 5ce3cf8 commit bc5fd2a
Show file tree
Hide file tree
Showing 20 changed files with 1,906 additions and 446 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ clap = { version = "4.4.7", features = ["derive"] }
ctrlc = { version = "3.4.2", features = ["termination"] }
delegate = "0.12.0"
educe = { version = "0.6.0", default-features = false, features = ["Debug"] }
io-uring = "0.7.0"
ipnet = { version = "2.8.0", features = ["serde"]}
libc = "0.2.152"
lightway-app-utils = { path = "./lightway-app-utils" }
Expand All @@ -52,3 +53,4 @@ tokio-util = "0.7.10"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
twelf = { version = "0.15.0", default-features = false, features = ["env", "clap", "yaml"]}
tun = { version = "0.7.1" }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Protocol and design documentation can be found in the
Lightway rust implementation currently supports Linux OS. Both x86_64 and arm64 platforms are
supported and built as part of CI.

Support for other platforms will be added soon.
Support for other client platforms will be added soon.

## Development steps

Expand Down
5 changes: 3 additions & 2 deletions lightway-app-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ bytes.workspace = true
clap.workspace = true
fs-mistrust = { version = "0.8.0", default-features = false }
humantime = "2.1.0"
io-uring = { version = "0.7.0", optional = true }
io-uring = { workspace = true, optional = true }
ipnet.workspace = true
libc.workspace = true
lightway-core.workspace = true
Expand All @@ -38,11 +38,12 @@ tokio-stream = { workspace = true, optional = true }
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
tun = { version = "0.7", features = ["async"] }
tun = { workspace = true, features = ["async"] }

[[example]]
name = "udprelay"
path = "examples/udprelay.rs"
required-features = ["io-uring"]

[dev-dependencies]
async-trait.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions lightway-app-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ mod event_stream;
mod iouring;
mod tun;

mod net;
pub use net::{sockaddr_from_socket_addr, socket_addr_from_sockaddr};

#[cfg(feature = "tokio")]
pub use connection_ticker::{
connection_ticker_cb, ConnectionTicker, ConnectionTickerState, ConnectionTickerTask, Tickable,
Expand Down
179 changes: 179 additions & 0 deletions lightway-app-utils/src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use std::{io, net::SocketAddr};

/// Convert from `libc::sockaddr_storage` to `std::net::SocketAddr`
#[allow(unsafe_code)]
pub fn socket_addr_from_sockaddr(
storage: &libc::sockaddr_storage,
len: libc::socklen_t,
) -> io::Result<SocketAddr> {
match storage.ss_family as libc::c_int {
libc::AF_INET => {
if (len as usize) < std::mem::size_of::<libc::sockaddr_in>() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid argument (inet len)",
));
}

// SAFETY: Casting from sockaddr_storage to sockaddr_in is safe since we have validated the len.
let addr =
unsafe { &*(storage as *const libc::sockaddr_storage as *const libc::sockaddr_in) };

let ip = u32::from_be(addr.sin_addr.s_addr);
let ip = std::net::Ipv4Addr::from_bits(ip);
let port = u16::from_be(addr.sin_port);

Ok((ip, port).into())
}
libc::AF_INET6 => {
if (len as usize) < std::mem::size_of::<libc::sockaddr_in6>() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid argument (inet6 len)",
));
}
// SAFETY: Casting from sockaddr_storage to sockaddr_in6 is safe since we have validated the len.
let addr = unsafe {
&*(storage as *const libc::sockaddr_storage as *const libc::sockaddr_in6)
};

let ip = u128::from_be_bytes(addr.sin6_addr.s6_addr);
let ip = std::net::Ipv6Addr::from_bits(ip);
let port = u16::from_be(addr.sin6_port);

Ok((ip, port).into())
}
_ => Err(io::Error::new(
std::io::ErrorKind::InvalidInput,
"invalid argument (ss_family)",
)),
}
}

/// Convert from `std::net::SocketAddr` to `libc::sockaddr_storage`+`libc::socklen_t`
#[allow(unsafe_code)]
pub fn sockaddr_from_socket_addr(addr: SocketAddr) -> (libc::sockaddr_storage, libc::socklen_t) {
// SAFETY: All zeroes is a valid sockaddr_storage
let mut storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() };

let len = match addr {
SocketAddr::V4(v4) => {
let p = &mut storage as *mut libc::sockaddr_storage as *mut libc::sockaddr_in;
// SAFETY: sockaddr_storage is defined to be big enough for any sockaddr_*.
unsafe {
p.write(libc::sockaddr_in {
sin_family: libc::AF_INET as _,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: v4.ip().to_bits().to_be(),
},
sin_zero: Default::default(),
})
};
std::mem::size_of::<libc::sockaddr_in>() as libc::socklen_t
}
SocketAddr::V6(v6) => {
let p = &mut storage as *mut libc::sockaddr_storage as *mut libc::sockaddr_in6;
// SAFETY: sockaddr_storage is defined to be big enough for any sockaddr_*.
unsafe {
p.write(libc::sockaddr_in6 {
sin6_family: libc::AF_INET6 as _,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo().to_be(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().to_bits().to_be_bytes(),
},
sin6_scope_id: v6.scope_id().to_be(),
})
};
std::mem::size_of::<libc::sockaddr_in6>() as libc::socklen_t
}
};

(storage, len)
}

#[cfg(test)]
mod tests {
#![allow(unsafe_code, clippy::undocumented_unsafe_blocks)]

use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
str::FromStr as _,
};

use super::*;

use test_case::test_case;

#[test]
fn socket_addr_from_sockaddr_unknown_af() {
// Test assumes these don't match the zero initialized
// libc::sockaddr_storage::ss_family.
assert_ne!(libc::AF_INET, 0);
assert_ne!(libc::AF_INET6, 0);

let storage = unsafe { std::mem::zeroed() };
let err =
socket_addr_from_sockaddr(&storage, std::mem::size_of::<libc::sockaddr_storage>() as _)
.unwrap_err();

assert!(matches!(err.kind(), std::io::ErrorKind::InvalidInput));
assert!(err.to_string().contains("invalid argument (ss_family)"));
}

#[test]
fn socket_addr_from_sockaddr_unknown_af_inet_short() {
let mut storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
storage.ss_family = libc::AF_INET as libc::sa_family_t;

let err = socket_addr_from_sockaddr(
&storage,
(std::mem::size_of::<libc::sockaddr_in>() - 1) as _,
)
.unwrap_err();

assert!(matches!(err.kind(), std::io::ErrorKind::InvalidInput));
assert!(err.to_string().contains("invalid argument (inet len)"));
}

#[test]
fn socket_addr_from_sockaddr_unknown_af_inet6_short() {
let mut storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
storage.ss_family = libc::AF_INET6 as libc::sa_family_t;

let err = socket_addr_from_sockaddr(
&storage,
(std::mem::size_of::<libc::sockaddr_in6>() - 1) as _,
)
.unwrap_err();

assert!(matches!(err.kind(), std::io::ErrorKind::InvalidInput));
assert!(err.to_string().contains("invalid argument (inet6 len)"));
}

#[test]
fn sockaddr_from_socket_addr_inet() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let (storage, len) = sockaddr_from_socket_addr(socket_addr);
assert_eq!(storage.ss_family, libc::AF_INET as libc::sa_family_t);
assert_eq!(len as usize, std::mem::size_of::<libc::sockaddr_in>());
}

#[test]
fn sockaddr_from_socket_addr_inet6() {
let socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8080);
let (storage, len) = sockaddr_from_socket_addr(socket_addr);
assert_eq!(storage.ss_family, libc::AF_INET6 as libc::sa_family_t);
assert_eq!(len as usize, std::mem::size_of::<libc::sockaddr_in6>());
}

#[test_case("127.0.0.1:443")]
#[test_case("[::1]:8888")]
fn round_trip(addr: &str) {
let orig = SocketAddr::from_str(addr).unwrap();
let (storage, len) = sockaddr_from_socket_addr(orig);
let round_tripped = socket_addr_from_sockaddr(&storage, len).unwrap();
assert_eq!(orig, round_tripped)
}
}
5 changes: 3 additions & 2 deletions lightway-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ license = "AGPL-3.0-only"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["io-uring"]
default = []
debug = ["lightway-core/debug"]
io-uring = ["lightway-app-utils/io-uring"]

[lints]
workspace = true
Expand All @@ -26,6 +25,7 @@ clap.workspace = true
ctrlc.workspace = true
delegate.workspace = true
educe.workspace = true
io-uring.workspace = true
ipnet.workspace = true
jsonwebtoken = "9.3.0"
libc.workspace = true
Expand All @@ -48,6 +48,7 @@ tokio-stream = { workspace = true, features = ["time"] }
tracing.workspace = true
tracing-log = "0.2.0"
tracing-subscriber = { workspace = true, features = ["json"] }
tun.workspace = true
twelf.workspace = true

[dev-dependencies]
Expand Down
60 changes: 53 additions & 7 deletions lightway-server/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,25 @@ pub struct Config {
#[clap(long, default_value_t)]
pub enable_pqc: bool,

/// Enable IO-uring interface for Tunnel
#[clap(long, default_value_t)]
pub enable_tun_iouring: bool,

/// IO-uring submission queue count. Only applicable when
/// `enable_tun_iouring` is `true`
// Any value more than 1024 negatively impact the throughput
/// Total IO-uring submission queue count.
///
/// Must be larger than the total of:
///
/// UDP:
///
/// iouring_tun_rx_count + iouring_udp_rx_count +
/// iouring_tx_count + 1 (cancellation request)
///
/// TCP:
///
/// iouring_tun_rx_count + iouring_tx_count + 1 (cancellation
/// request) + 2 * maximum number of connections.
///
/// Each connection actually uses up to 3 slots, a persistent
/// recv request and on demand slots for TX and cancellation
/// (teardown).
///
/// There is no downside to setting this much larger.
#[clap(long, default_value_t = 1024)]
pub iouring_entry_count: usize,

Expand All @@ -87,6 +99,36 @@ pub struct Config {
#[clap(long, default_value = "100ms")]
pub iouring_sqpoll_idle_time: Duration,

/// Number of concurrent TUN device read requests to issue to
/// IO-uring. Setting this too large may negatively impact
/// performance.
#[clap(long, default_value_t = 64)]
pub iouring_tun_rx_count: u32,

/// Configure TUN device in blocking mode. This can allow
/// equivalent performance with fewer `ìouring-tun-rx-count`
/// entries but can significantly harm performance on some kernels
/// where the kernel does not indicate that the tun device handles
/// `FMODE_NOWAIT`.
///
/// If blocking mode is enabled then `iouring_tun_rx_count` may be
/// set much lower.
///
/// This was fixed by <https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=438b406055cd21105aad77db7938ee4720b09bee>
/// which was part of v6.4-rc1.
#[clap(long, default_value_t = false)]
pub iouring_tun_blocking: bool,

/// Number of concurrent UDP socket recvmsg requests to issue to
/// IO-uring.
#[clap(long, default_value_t = 32)]
pub iouring_udp_rx_count: u32,

/// Maximum number of concurrent UDP + TUN sendmsg/write requests
/// to issue to IO-uring.
#[clap(long, default_value_t = 512)]
pub iouring_tx_count: u32,

/// Log format
#[clap(long, value_enum, default_value_t = LogFormat::Full)]
pub log_format: LogFormat,
Expand All @@ -111,6 +153,10 @@ pub struct Config {
#[clap(long, default_value_t = ByteSize::mib(15))]
pub udp_buffer_size: ByteSize,

/// Set UDP buffer size. Default value is 256 KiB.
#[clap(long, default_value_t = ByteSize::kib(256))]
pub tcp_buffer_size: ByteSize,

/// Enable WolfSSL debug logging
#[cfg(feature = "debug")]
#[clap(long)]
Expand Down
Loading

0 comments on commit bc5fd2a

Please sign in to comment.