Skip to content

Commit

Permalink
Fixed buffers to support ReadFixed and WriteFixed ops (#54)
Browse files Browse the repository at this point in the history
Add infrastructure to manage pre-registered buffers, and operation
methods `File::read_fixed_at` and `File::write_fixed_at` to make use of
them. Exclusive access to buffer data between the application and
in-flight ops is controlled at runtime.

This is initial API to enable fixed buffers. Future developments may
include:

- ✅ Improved parameter polymorphism with ~#53~ #172;
- An internal linked list of free buffers, to be able to check out the
next available buffer from `FixedBufRegistry` without enumerating or
keeping track of indices;
- Support `IORING_REGISTER_BUFFERS2`/`IORING_REGISTER_BUFFERS_UPDATE`.
  • Loading branch information
mzabaluev authored Nov 23, 2022
1 parent 0e1c987 commit 8750caa
Show file tree
Hide file tree
Showing 17 changed files with 946 additions and 1 deletion.
113 changes: 113 additions & 0 deletions src/buf/fixed/buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use libc::{iovec, UIO_MAXIOV};
use std::cmp;
use std::mem;
use std::ptr;
use std::slice;

// Internal state shared by FixedBufRegistry and FixedBuf handles.
pub(crate) struct FixedBuffers {
// Pointer to an allocated array of iovec records referencing
// the allocated buffers. The number of initialized records is the
// same as the length of the states array.
raw_bufs: ptr::NonNull<iovec>,
// State information on the buffers. Indices in this array correspond to
// the indices in the array at raw_bufs.
states: Vec<BufState>,
// Original capacity of raw_bufs as a Vec.
orig_cap: usize,
}

// State information of a buffer in the registry,
enum BufState {
// The buffer is not in use.
// The field records the length of the initialized part.
Free { init_len: usize },
// The buffer is checked out.
// Its data are logically owned by the FixedBuf handle,
// which also keeps track of the length of the initialized part.
CheckedOut,
}

impl FixedBuffers {
pub(crate) fn new(bufs: impl Iterator<Item = Vec<u8>>) -> Self {
let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, 65_536));
let (size_hint, _) = bufs.size_hint();
let mut iovecs = Vec::with_capacity(size_hint);
let mut states = Vec::with_capacity(size_hint);
for mut buf in bufs {
iovecs.push(iovec {
iov_base: buf.as_mut_ptr() as *mut _,
iov_len: buf.capacity(),
});
states.push(BufState::Free {
init_len: buf.len(),
});
mem::forget(buf);
}
debug_assert_eq!(iovecs.len(), states.len());
// Safety: Vec::as_mut_ptr never returns null
let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) };
let orig_cap = iovecs.capacity();
mem::forget(iovecs);
FixedBuffers {
raw_bufs,
states,
orig_cap,
}
}

// If the indexed buffer is free, changes its state to checked out and
// returns its data. If the buffer is already checked out, returns None.
pub(crate) fn check_out(&mut self, index: usize) -> Option<(iovec, usize)> {
let iovecs_ptr = self.raw_bufs;
self.states.get_mut(index).and_then(|state| match *state {
BufState::Free { init_len } => {
*state = BufState::CheckedOut;
// Safety: the allocated array under the pointer is valid
// for the lifetime of self, the index is inside the array
// as checked by Vec::get_mut above, called on the array of
// states that has the same length.
let iovec = unsafe { iovecs_ptr.as_ptr().add(index).read() };
Some((iovec, init_len))
}
BufState::CheckedOut => None,
})
}

// Sets the indexed buffer's state to free and records the updated length
// of its initialized part. The buffer addressed must be in the checked out
// state, otherwise this function may panic.
pub(crate) fn check_in(&mut self, index: usize, init_len: usize) {
let state = self.states.get_mut(index).expect("invalid buffer index");
debug_assert!(
matches!(state, BufState::CheckedOut),
"the buffer must be checked out"
);
*state = BufState::Free { init_len };
}

pub(crate) fn iovecs(&self) -> &[iovec] {
// Safety: the raw_bufs pointer is valid for the lifetime of self,
// the slice length is valid by construction.
unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) }
}
}

impl Drop for FixedBuffers {
fn drop(&mut self) {
let iovecs = unsafe {
Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap)
};
for (i, iovec) in iovecs.iter().enumerate() {
match self.states[i] {
BufState::Free { init_len } => {
let ptr = iovec.iov_base as *mut u8;
let cap = iovec.iov_len;
let v = unsafe { Vec::from_raw_parts(ptr, init_len, cap) };
mem::drop(v);
}
BufState::CheckedOut => unreachable!("all buffers must be checked in"),
}
}
}
}
110 changes: 110 additions & 0 deletions src/buf/fixed/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use super::FixedBuffers;
use crate::buf::{IoBuf, IoBufMut};

use libc::iovec;
use std::cell::RefCell;
use std::fmt::{self, Debug};
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;

/// A unique handle to a memory buffer that can be pre-registered with
/// the kernel for `io-uring` operations.
///
/// `FixedBuf` handles can be obtained from a [`FixedBufRegistry`] collection.
/// For each buffer, only a single `FixedBuf` handle can be either used by the
/// application code or owned by an I/O operation at any given time,
/// thus avoiding data races between `io-uring` operations in flight and
/// the application accessing buffer data.
///
/// [`FixedBufRegistry`]: super::FixedBufRegistry
///
pub struct FixedBuf {
registry: Rc<RefCell<FixedBuffers>>,
buf: ManuallyDrop<Vec<u8>>,
index: u16,
}

impl Drop for FixedBuf {
fn drop(&mut self) {
let mut registry = self.registry.borrow_mut();
debug_assert_eq!(
registry.iovecs()[self.index as usize].iov_base as *const u8,
self.buf.as_ptr()
);
debug_assert_eq!(
registry.iovecs()[self.index as usize].iov_len,
self.buf.capacity()
);
registry.check_in(self.index as usize, self.buf.len());
}
}

impl FixedBuf {
pub(super) unsafe fn new(
registry: Rc<RefCell<FixedBuffers>>,
iovec: iovec,
init_len: usize,
index: u16,
) -> Self {
let buf = Vec::from_raw_parts(iovec.iov_base as _, init_len, iovec.iov_len);
FixedBuf {
registry,
buf: ManuallyDrop::new(buf),
index,
}
}

pub(crate) fn buf_index(&self) -> u16 {
self.index
}
}

unsafe impl IoBuf for FixedBuf {
fn stable_ptr(&self) -> *const u8 {
self.buf.as_ptr()
}

fn bytes_init(&self) -> usize {
self.buf.len()
}

fn bytes_total(&self) -> usize {
self.buf.capacity()
}
}

unsafe impl IoBufMut for FixedBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.buf.as_mut_ptr()
}

unsafe fn set_init(&mut self, pos: usize) {
if self.buf.len() < pos {
self.buf.set_len(pos)
}
}
}

impl Deref for FixedBuf {
type Target = [u8];

fn deref(&self) -> &[u8] {
&self.buf
}
}

impl DerefMut for FixedBuf {
fn deref_mut(&mut self) -> &mut [u8] {
&mut self.buf
}
}

impl Debug for FixedBuf {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FixedBuf")
.field("buf", &*self.buf) // deref ManuallyDrop
.field("index", &self.index)
.finish_non_exhaustive()
}
}
21 changes: 21 additions & 0 deletions src/buf/fixed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Buffers pre-registered with the kernel.
//!
//! This module provides facilities for registering in-memory buffers with
//! the `tokio-uring` runtime. Operations like [`File::read_fixed_at`][rfa] and
//! [`File::write_fixed_at`][wfa] make use of buffers pre-mapped by
//! the kernel to reduce per-I/O overhead.
//! The [`FixedBufRegistry::register`] method is used to register a collection of
//! buffers with the kernel; it must be called before any of the [`FixedBuf`]
//! handles to the collection's buffers can be used with I/O operations.
//!
//! [rfa]: crate::fs::File::read_fixed_at
//! [wfa]: crate::fs::File::write_fixed_at
mod buffers;
pub(crate) use self::buffers::FixedBuffers;

mod handle;
pub use handle::FixedBuf;

mod registry;
pub use registry::FixedBufRegistry;
114 changes: 114 additions & 0 deletions src/buf/fixed/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use super::{buffers::FixedBuffers, FixedBuf};

use std::cell::RefCell;
use std::io;
use std::rc::Rc;

/// An indexed collection of I/O buffers pre-registered with the kernel.
///
/// `FixedBufRegistry` allows the application to manage a collection of buffers
/// allocated in memory, that can be registered in the current `tokio-uring`
/// context using the [`register`] method.
///
/// A `FixedBufRegistry` value is a lightweight handle for a collection of
/// allocated buffers. Cloning of a `FixedBufRegistry` creates a new reference to
/// the same collection of buffers.
///
/// The buffers of the collection are not deallocated until:
/// - all `FixedBufRegistry` references to the collection have been dropped;
/// - all [`FixedBuf`] handles to individual buffers in the collection have
/// been dropped, including the buffer handles owned by any I/O operations
/// in flight;
/// - The `tokio-uring` [`Runtime`] the buffers are registered with
/// has been dropped.
///
/// [`register`]: Self::register
/// [`Runtime`]: crate::Runtime
#[derive(Clone)]
pub struct FixedBufRegistry {
inner: Rc<RefCell<FixedBuffers>>,
}

impl FixedBufRegistry {
/// Creates a new collection of buffers from the provided allocated vectors.
///
/// The buffers are assigned 0-based indices in the order of the iterable
/// input parameter. The returned collection takes up to [`UIO_MAXIOV`]
/// buffers from the input. Any items in excess of that amount are silently
/// dropped, unless the input iterator produces the vectors lazily.
///
/// [`UIO_MAXIOV`]: libc::UIO_MAXIOV
///
/// # Examples
///
/// ```
/// use tokio_uring::buf::fixed::FixedBufRegistry;
/// use std::iter;
///
/// let registry = FixedBufRegistry::new(iter::repeat(vec![0; 4096]).take(10));
/// ```
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
FixedBufRegistry {
inner: Rc::new(RefCell::new(FixedBuffers::new(bufs.into_iter()))),
}
}

/// Registers the buffers with the kernel.
///
/// This method must be called in the context of a `tokio-uring` runtime.
/// The registration persists for the lifetime of the runtime, unless
/// revoked by the [`unregister`] method. Dropping the
/// `FixedBufRegistry` instance this method has been called on does not revoke
/// the registration or deallocate the buffers.
///
/// [`unregister`]: Self::unregister
///
/// This call can be blocked in the kernel to complete any operations
/// in-flight on the same `io-uring` instance. The application is
/// recommended to register buffers before starting any I/O operations.
///
/// # Errors
///
/// If a collection of buffers is currently registered in the context
/// of the `tokio-uring` runtime this call is made in, the function returns
/// an error.
pub fn register(&self) -> io::Result<()> {
crate::io::register_buffers(&self.inner)
}

/// Unregisters this collection of buffers.
///
/// This method must be called in the context of a `tokio-uring` runtime,
/// where the buffers should have been previously registered.
///
/// This operation invalidates any `FixedBuf` handles checked out from
/// this registry instance. Continued use of such handles in I/O
/// operations may result in an error.
///
/// # Errors
///
/// If another collection of buffers is currently registered in the context
/// of the `tokio-uring` runtime this call is made in, the function returns
/// an error. Calling `unregister` when no `FixedBufRegistry` is currently
/// registered on this runtime also returns an error.
pub fn unregister(&self) -> io::Result<()> {
crate::io::unregister_buffers(&self.inner)
}

/// Returns a buffer identified by the specified index for use by the
/// application, unless the buffer is already in use.
///
/// The buffer is released to be available again once the
/// returned `FixedBuf` handle has been dropped. An I/O operation
/// using the buffer takes ownership of it and returns it once completed,
/// preventing shared use of the buffer while the operation is in flight.
pub fn check_out(&self, index: usize) -> Option<FixedBuf> {
let mut inner = self.inner.borrow_mut();
inner.check_out(index).map(|(iovec, init_len)| {
debug_assert!(index <= u16::MAX as usize);
// Safety: the validity of iovec and init_len is ensured by
// FixedBuffers::check_out
unsafe { FixedBuf::new(Rc::clone(&self.inner), iovec, init_len, index as u16) }
})
}
}
2 changes: 2 additions & 0 deletions src/buf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer
//! types that respect the `io-uring` contract.
pub mod fixed;

mod io_buf;
pub use io_buf::IoBuf;

Expand Down
Loading

0 comments on commit 8750caa

Please sign in to comment.