Skip to content

Commit

Permalink
lightway-server: Use i/o uring for all i/o, not just tun.
Browse files Browse the repository at this point in the history
This does not massively improve performance but reduces CPU overheads (by
around 50%-100% i.e. half to one core) under heavy traffic, which adding
perhaps a few hundred Mbps to a speedtest.net download test and making
negligible difference to the upload test. It also removes about 1ms from the
latency in the same tests. Finally the STDEV across multiple test runs appears
to be lower.

This appears to be due to a combination of avoiding async runtime overheads, as
well as removing various channels/queues in favour of a more direct model of
interaction between the ring and the connections.

As well as those benefits we are now able to reach the same level of
performance with far fewer slots used for the TUN rx path, here we use 64 slots
(by default) and reach the same performance as using 1024 previously. The way
uring handles blocking vs async for tun devices seems to be non-optimal. In
blocking mode things are very slow. In async mode more and more time is spent
on bookkeeping and polling, as the number of slots is increased, plus a high
level of EAGAIN results (due to a request timing out after multiple failed
polls[^0]) which waste time requeueing. This is related to
axboe/liburing#886 and
axboe/liburing#239.

For UDP/TCP sockets io uring behaves well with the socket in blocking mode
which avoids processing lots of EAGAIN results.

Tuning the slots for each I/O path is a bit of an art (more is definitely not
always better) and the sweet spot varies depending on the I/O device, so
provide various tunables instead of just splitting the ring evenly. With this
there's no real reason to have a very large ring, it's the number of inflight
requests which matters.

This is specific to the server since it relies on kernel features and
correctness(/lack of bugs) which may not be upheld on an arbitrary client
system (while it is assumed that server operators have more control over what
they run). It is also not portable to non-Linux systems. It is known to work
with Linux 6.1 (as found in Debian 12 AKA bookworm).

Note that this kernel version contains a bug which causes the `iou-sqp-*`
kernel thread to get stuck (unkillable) if the tun is in blocking mode,
therefore an option is provided. Enabling that option on a kernel which
contains [the fix][] allows equivalent performance with fewer slots on the
ring.

[^0]: When data becomes available _all_ requests are woken but only one will
      find data, the rest will see EAGAIN and after a certain number of such
      events I/O uring will propagate this back to userspace.
[the fix]: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=438b406055cd21105aad77db7938ee4720b09bee
  • Loading branch information
xv-ian-c committed Nov 11, 2024
1 parent 1847932 commit 0832841
Show file tree
Hide file tree
Showing 16 changed files with 1,665 additions and 433 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ tokio-util = "0.7.10"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
twelf = { version = "0.15.0", default-features = false, features = ["env", "clap", "yaml"]}
tun = { version = "0.7.1" }
2 changes: 1 addition & 1 deletion lightway-app-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ tokio-stream = { workspace = true, optional = true }
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
tun = { version = "0.7", features = ["async"] }
tun = { workspace = true, features = ["async"] }

[[example]]
name = "udprelay"
Expand Down
5 changes: 3 additions & 2 deletions lightway-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ license = "GPL-2.0-only"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["io-uring"]
default = []
debug = ["lightway-core/debug"]
io-uring = ["lightway-app-utils/io-uring"]

[lints]
workspace = true
Expand All @@ -26,6 +25,7 @@ clap.workspace = true
ctrlc.workspace = true
delegate.workspace = true
educe.workspace = true
io-uring = "0.7.0"
ipnet.workspace = true
jsonwebtoken = "9.3.0"
libc.workspace = true
Expand All @@ -48,6 +48,7 @@ tokio-stream = { workspace = true, features = ["time"] }
tracing.workspace = true
tracing-log = "0.2.0"
tracing-subscriber = { workspace = true, features = ["json"] }
tun.workspace = true
twelf.workspace = true

[dev-dependencies]
Expand Down
55 changes: 48 additions & 7 deletions lightway-server/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,57 @@ pub struct Config {
#[clap(long, default_value_t)]
pub enable_pqc: bool,

/// Enable IO-uring interface for Tunnel
#[clap(long, default_value_t)]
pub enable_tun_iouring: bool,

/// IO-uring submission queue count. Only applicable when
/// `enable_tun_iouring` is `true`
// Any value more than 1024 negatively impact the throughput
/// Total IO-uring submission queue count.
///
/// Must be larger than the total of:
///
/// UDP:
///
/// iouring_tun_rx_count + iouring_udp_rx_count +
/// iouring_tx_count + 1 (cancellation request)
///
/// TCP:
///
/// iouring_tun_rx_count + iouring_tx_count + 1 (cancellation
/// request) + 2 * maximum number of connections.
///
/// Each connection actually uses up to 3 slots, a persistent
/// recv request and on demand slots for TX and cancellation
/// (teardown).
///
/// There is no downside to setting this much larger.
#[clap(long, default_value_t = 1024)]
pub iouring_entry_count: usize,

/// Number of concurrent TUN device read requests to issue to
/// IO-uring. Setting this too large may negatively impact
/// performance.
#[clap(long, default_value_t = 64)]
pub iouring_tun_rx_count: u32,

/// Configure TUN device in blocking mode. This can allow
/// equivalent performance with fewer `ìouring-tun-rx-count`
/// entries but can significantly harm performance on some kernels
/// where the kernel does not indicate that the tun device handles
/// `FMODE_NOWAIT`.
///
/// If blocking mode is enabled then `iouring_tun_rx_count` may be
/// set much lower.
///
/// This was fixed by <https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=438b406055cd21105aad77db7938ee4720b09bee>
#[clap(long, default_value_t = false)]
pub iouring_tun_blocking: bool,

/// Number of concurrent UDP socket recvmsg requests to issue to
/// IO-uring.
#[clap(long, default_value_t = 32)]
pub iouring_udp_rx_count: u32,

/// Maximum number of concurrent UDP + TUN sendmsg/write requests
/// to issue to IO-uring.
#[clap(long, default_value_t = 512)]
pub iouring_tx_count: u32,

/// Log format
#[clap(long, value_enum, default_value_t = LogFormat::Full)]
pub log_format: LogFormat,
Expand Down
266 changes: 266 additions & 0 deletions lightway-server/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,268 @@
pub(crate) mod inside;
pub(crate) mod outside;

mod ffi;
mod tx;

use std::{
os::fd::{AsRawFd, OwnedFd, RawFd},
sync::{Arc, Mutex},
};

use anyhow::{anyhow, Result};
use io_uring::{
cqueue::Entry as CEntry,
opcode,
squeue::Entry as SEntry,
types::{Fd, Fixed},
IoUring, SubmissionQueue, Submitter,
};

use ffi::{iovec, msghdr};
pub use tx::TxQueue;

const IOURING_SQPOLL_IDLE_TIME_MS: u32 = 100;

/// Convenience function to handle errors in a uring result codes
/// (which are negative errno codes).
fn io_uring_res(res: i32) -> std::io::Result<i32> {
if res < 0 {
Err(std::io::Error::from_raw_os_error(-res))
} else {
Ok(res)
}
}

/// An I/O source pushing requests to a uring instance
pub(crate) trait UringIoSource: Send {
/// Return the raw file descriptor. This will be registered as an
/// fd with the ring, allowing the use of io_uring::types::Fixed.
fn as_raw_fd(&self) -> RawFd;

/// Push the initial set of requests to `sq`.
fn push_initial_ops(&mut self, sq: &mut io_uring::SubmissionQueue) -> Result<()>;

/// Complete an rx request
fn complete_rx(
&mut self,
sq: &mut io_uring::SubmissionQueue,
cqe: io_uring::cqueue::Entry,
idx: u32,
) -> Result<()>;

/// Complete a tx request
fn complete_tx(
&mut self,
sq: &mut io_uring::SubmissionQueue,
cqe: io_uring::cqueue::Entry,
idx: u32,
) -> Result<()>;
}

pub(crate) enum OutsideIoSource {
Udp(outside::udp::UdpServer),
Tcp(outside::tcp::TcpServer),
}

// Avoiding `dyn`amic dispatch is a small performance win.
impl UringIoSource for OutsideIoSource {
fn as_raw_fd(&self) -> RawFd {
match self {
OutsideIoSource::Udp(udp) => udp.as_raw_fd(),
OutsideIoSource::Tcp(tcp) => tcp.as_raw_fd(),
}
}

fn push_initial_ops(&mut self, sq: &mut io_uring::SubmissionQueue) -> Result<()> {
match self {
OutsideIoSource::Udp(udp) => udp.push_initial_ops(sq),
OutsideIoSource::Tcp(tcp) => tcp.push_initial_ops(sq),
}
}

fn complete_rx(
&mut self,
sq: &mut io_uring::SubmissionQueue,
cqe: io_uring::cqueue::Entry,
idx: u32,
) -> Result<()> {
match self {
OutsideIoSource::Udp(udp) => udp.complete_rx(sq, cqe, idx),
OutsideIoSource::Tcp(tcp) => tcp.complete_rx(sq, cqe, idx),
}
}

fn complete_tx(
&mut self,
sq: &mut io_uring::SubmissionQueue,
cqe: io_uring::cqueue::Entry,
idx: u32,
) -> Result<()> {
match self {
OutsideIoSource::Udp(udp) => udp.complete_tx(sq, cqe, idx),
OutsideIoSource::Tcp(tcp) => tcp.complete_tx(sq, cqe, idx),
}
}
}

pub(crate) struct Loop {
ring: IoUring,

tx: Arc<Mutex<TxQueue>>,

cancel_buf: u8,

outside: OutsideIoSource,
inside: inside::tun::Tun,
}

impl Loop {
/// Use for outside IO requests, `self.outside.as_raw_fd` will be registered in this slot.
const FIXED_OUTSIDE_FD: Fixed = Fixed(0);
/// Use for inside IO requests, `self.inside.as_raw_fd` will be registered in this slot.
const FIXED_INSIDE_FD: Fixed = Fixed(1);

/// Masks the bits used by `*_USER_DATA_BASE`
const USER_DATA_TYPE_MASK: u64 = 0xe000_0000_0000_0000;

/// Indexes in this range will result in a call to `self.outside.complete_rx`
const OUTSIDE_RX_USER_DATA_BASE: u64 = 0xc000_0000_0000_0000;
/// Indexes in this range will result in a call to `self.outside.complete_tx`
const OUTSIDE_TX_USER_DATA_BASE: u64 = 0x8000_0000_0000_0000;

/// Indexes in this range will result in a call to `self.inside.complete_rx`
const INSIDE_RX_USER_DATA_BASE: u64 = 0x4000_0000_0000_0000;
/// Indexes in this range will result in a call to `self.inside.complete_tx`
const INSIDE_TX_USER_DATA_BASE: u64 = 0x2000_0000_0000_0000;

/// Indexes in this range are used by `Loop` itself.
const CONTROL_USER_DATA_BASE: u64 = 0x0000_0000_0000_0000;

/// A read request on the cancellation fd (used to exit the io loop)
const CANCEL_USER_DATA: u64 = Self::CONTROL_USER_DATA_BASE + 1;

/// Return user data for a particular outside rx index.
fn outside_rx_user_data(idx: u32) -> u64 {
Self::OUTSIDE_RX_USER_DATA_BASE + (idx as u64)
}

/// Return user data for a particular inside rx index.
fn inside_rx_user_data(idx: u32) -> u64 {
Self::INSIDE_RX_USER_DATA_BASE + (idx as u64)
}

/// Return user data for a particular outside tx index.
fn inside_tx_user_data(idx: u32) -> u64 {
Self::INSIDE_TX_USER_DATA_BASE + (idx as u64)
}

/// Return user data for a particular inside tx index.
fn outside_tx_user_data(idx: u32) -> u64 {
Self::OUTSIDE_TX_USER_DATA_BASE + (idx as u64)
}

pub(crate) fn new(
ring_size: usize,
tx: Arc<Mutex<TxQueue>>,
outside: OutsideIoSource,
inside: inside::tun::Tun,
) -> Result<Self> {
tracing::info!(ring_size, "creating IoUring");
let ring: IoUring<SEntry, CEntry> = IoUring::builder()
.dontfork()
.setup_sqpoll(IOURING_SQPOLL_IDLE_TIME_MS) // Needs 5.13
.build(ring_size as u32)?;

Ok(Self {
ring,
tx,
cancel_buf: 0,
outside,
inside,
})
}

pub(crate) fn run(mut self, cancel: OwnedFd) -> Result<()> {
let (submitter, mut sq, mut cq) = self.ring.split();

submitter.register_files(&[self.outside.as_raw_fd(), self.inside.as_raw_fd()])?;

let sqe = opcode::Read::new(
Fd(cancel.as_raw_fd()),
&mut self.cancel_buf as *mut _,
std::mem::size_of_val(&self.cancel_buf) as _,
)
.build()
.user_data(Self::CANCEL_USER_DATA);

#[allow(unsafe_code)]
// SAFETY: The buffer is owned by `self.cancel_buf` and `self` is owned
unsafe {
sq.push(&sqe)?
};

self.outside.push_initial_ops(&mut sq)?;
self.inside.push_initial_ops(&mut sq)?;
sq.sync();

loop {
let _ = submitter.submit_and_wait(1)?;

cq.sync();

for cqe in &mut cq {
let user_data = cqe.user_data();

match user_data & Self::USER_DATA_TYPE_MASK {
Self::CONTROL_USER_DATA_BASE => {
match user_data - Self::CONTROL_USER_DATA_BASE {
Self::CANCEL_USER_DATA => {
let res = cqe.result();
tracing::debug!(?res, "Uring cancelled");
return Ok(());
}
idx => {
return Err(anyhow!(
"Unknown control data {user_data:016x} => {idx:016x}"
))
}
}
}
Self::OUTSIDE_RX_USER_DATA_BASE => {
self.outside.complete_rx(
&mut sq,
cqe,
(user_data - Self::OUTSIDE_RX_USER_DATA_BASE) as u32,
)?;
}
Self::OUTSIDE_TX_USER_DATA_BASE => {
self.outside.complete_tx(
&mut sq,
cqe,
(user_data - Self::OUTSIDE_TX_USER_DATA_BASE) as u32,
)?;
}

Self::INSIDE_RX_USER_DATA_BASE => {
self.inside.complete_rx(
&mut sq,
cqe,
(user_data - Self::INSIDE_RX_USER_DATA_BASE) as u32,
)?;
}
Self::INSIDE_TX_USER_DATA_BASE => {
self.inside.complete_tx(
&mut sq,
cqe,
(user_data - Self::INSIDE_TX_USER_DATA_BASE) as u32,
)?;
}

_ => unreachable!(),
}

self.tx.lock().unwrap().drain(&submitter, &mut sq)?;
}
}
}
}
Loading

0 comments on commit 0832841

Please sign in to comment.