diff --git a/Cargo.toml b/Cargo.toml index 50fdc0df..24cc6700 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ keywords = ["async", "fs", "io-uring"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.2", features = ["net", "rt"] } +tokio = { version = "1.2", features = ["net", "rt", "sync"] } slab = "0.4.2" libc = "0.2.80" io-uring = { version = "0.5.12", features = ["unstable"] } diff --git a/src/buf/fixed/handle.rs b/src/buf/fixed/handle.rs index f142f452..5e77c125 100644 --- a/src/buf/fixed/handle.rs +++ b/src/buf/fixed/handle.rs @@ -8,7 +8,7 @@ use std::ops::{Deref, DerefMut}; use std::rc::Rc; // Data to construct a `FixedBuf` handle from. -pub(super) struct CheckedOutBuf { +pub(crate) struct CheckedOutBuf { // Pointer and size of the buffer. pub iovec: iovec, // Length of the initialized part. diff --git a/src/buf/fixed/mod.rs b/src/buf/fixed/mod.rs index 3a8d054e..80641f78 100644 --- a/src/buf/fixed/mod.rs +++ b/src/buf/fixed/mod.rs @@ -21,7 +21,9 @@ pub use handle::FixedBuf; mod buffers; pub(crate) use buffers::FixedBuffers; -mod pool; +mod plumbing; + +pub mod pool; pub use pool::FixedBufPool; mod registry; diff --git a/src/buf/fixed/plumbing/mod.rs b/src/buf/fixed/plumbing/mod.rs new file mode 100644 index 00000000..dd26ecb3 --- /dev/null +++ b/src/buf/fixed/plumbing/mod.rs @@ -0,0 +1,8 @@ +// Internal data structures shared between thread-local and thread-safe +// fixed buffer collections. + +mod pool; +pub(super) use pool::Pool; + +mod registry; +pub(super) use registry::Registry; diff --git a/src/buf/fixed/plumbing/pool.rs b/src/buf/fixed/plumbing/pool.rs new file mode 100644 index 00000000..327acbc6 --- /dev/null +++ b/src/buf/fixed/plumbing/pool.rs @@ -0,0 +1,193 @@ +use crate::buf::fixed::{handle::CheckedOutBuf, FixedBuffers}; +use crate::buf::IoBufMut; + +use libc::{iovec, UIO_MAXIOV}; +use tokio::sync::Notify; + +use std::cmp; +use std::collections::HashMap; +use std::mem; +use std::ptr; +use std::slice; +use std::sync::Arc; + +// Internal state shared by FixedBufPool and FixedBuf handles. +pub(crate) struct Pool { + // Pointer to an allocated array of iovec records referencing + // the allocated buffers. The number of initialized records is the + // same as the length of the states array. + raw_bufs: ptr::NonNull, + // Original capacity of raw_bufs as a Vec. + orig_cap: usize, + // State information on the buffers. Indices in this array correspond to + // the indices in the array at raw_bufs. + states: Vec, + // Table of head indices of the free buffer lists in each size bucket. + free_buf_head_by_cap: HashMap, + // Original buffers, kept until drop + buffers: Vec, + // Used to notify tasks pending on `next` + notify_next_by_cap: HashMap>, +} + +// State information of a buffer in the registry, +enum BufState { + // The buffer is not in use. + Free { + // This field records the length of the initialized part. + init_len: usize, + // Index of the next buffer of the same capacity in a free buffer list, if any. + next: Option, + }, + // The buffer is checked out. + // Its data are logically owned by the FixedBuf handle, + // which also keeps track of the length of the initialized part. + CheckedOut, +} + +impl Pool { + pub(crate) fn new(bufs: impl Iterator) -> Self { + // Limit the number of buffers to the maximum allowable number. + let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, u16::MAX as usize)); + // Collect into `buffers`, which holds the backing buffers for + // the lifetime of the pool. Using collect may allow + // the compiler to apply collect in place specialization, + // to avoid an allocation. + let mut buffers = bufs.collect::>(); + let mut iovecs = Vec::with_capacity(buffers.len()); + let mut states = Vec::with_capacity(buffers.len()); + let mut free_buf_head_by_cap = HashMap::new(); + for (index, buf) in buffers.iter_mut().enumerate() { + let cap = buf.bytes_total(); + + // Link the buffer as the head of the free list for its capacity. + // This constructs the free buffer list to be initially retrieved + // back to front, which should be of no difference to the user. + let next = free_buf_head_by_cap.insert(cap, index as u16); + + iovecs.push(iovec { + iov_base: buf.stable_mut_ptr() as *mut _, + iov_len: cap, + }); + states.push(BufState::Free { + init_len: buf.bytes_init(), + next, + }); + } + debug_assert_eq!(iovecs.len(), states.len()); + debug_assert_eq!(iovecs.len(), buffers.len()); + + // Safety: Vec::as_mut_ptr never returns null + let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) }; + let orig_cap = iovecs.capacity(); + mem::forget(iovecs); + Pool { + raw_bufs, + orig_cap, + states, + free_buf_head_by_cap, + buffers, + notify_next_by_cap: HashMap::new(), + } + } + + // If the free buffer list for this capacity is not empty, checks out the first buffer + // from the list and returns its data. Otherwise, returns None. + pub(crate) fn try_next(&mut self, cap: usize) -> Option { + let free_head = self.free_buf_head_by_cap.get_mut(&cap)?; + let index = *free_head as usize; + let state = &mut self.states[index]; + + let (init_len, next) = match *state { + BufState::Free { init_len, next } => { + *state = BufState::CheckedOut; + (init_len, next) + } + BufState::CheckedOut => panic!("buffer is checked out"), + }; + + // Update the head of the free list for this capacity. + match next { + Some(i) => { + *free_head = i; + } + None => { + self.free_buf_head_by_cap.remove(&cap); + } + } + + // Safety: the allocated array under the pointer is valid + // for the lifetime of self, a free buffer index is inside the array, + // as also asserted by the indexing operation on the states array + // that has the same length. + let iovec = unsafe { self.raw_bufs.as_ptr().add(index).read() }; + debug_assert_eq!(iovec.iov_len, cap); + Some(CheckedOutBuf { + iovec, + init_len, + index: index as u16, + }) + } + + // Returns a `Notify` to use for waking up tasks awaiting a buffer of + // the specified capacity. + pub(crate) fn notify_on_next(&mut self, cap: usize) -> Arc { + let notify = self.notify_next_by_cap.entry(cap).or_default(); + Arc::clone(notify) + } + + fn check_in_internal(&mut self, index: u16, init_len: usize) { + let cap = self.iovecs()[index as usize].iov_len; + let state = &mut self.states[index as usize]; + debug_assert!( + matches!(state, BufState::CheckedOut), + "the buffer must be checked out" + ); + + // Link the buffer as the new head of the free list for its capacity. + // Recently checked in buffers will be first to be reused, + // improving cache locality. + let next = self.free_buf_head_by_cap.insert(cap, index); + + *state = BufState::Free { init_len, next }; + + if let Some(notify) = self.notify_next_by_cap.get(&cap) { + // Wake up a single task pending on `next` + notify.notify_one(); + } + } +} + +impl FixedBuffers for Pool { + fn iovecs(&self) -> &[iovec] { + // Safety: the raw_bufs pointer is valid for the lifetime of self, + // the length of the states array is also the length of buffers array + // by construction. + unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) } + } + + unsafe fn check_in(&mut self, index: u16, init_len: usize) { + self.check_in_internal(index, init_len) + } +} + +impl Drop for Pool { + fn drop(&mut self) { + for (i, state) in self.states.iter().enumerate() { + match state { + BufState::Free { init_len, .. } => { + // Update buffer initialization. + // The buffer is about to dropped, but this may release it + // from Registry ownership, rather than deallocate. + unsafe { self.buffers[i].set_init(*init_len) }; + } + BufState::CheckedOut => unreachable!("all buffers must be checked in"), + } + } + + // Rebuild Vec, so it's dropped + let _ = unsafe { + Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap) + }; + } +} diff --git a/src/buf/fixed/plumbing/registry.rs b/src/buf/fixed/plumbing/registry.rs new file mode 100644 index 00000000..ff8ffe30 --- /dev/null +++ b/src/buf/fixed/plumbing/registry.rs @@ -0,0 +1,140 @@ +use crate::buf::fixed::{handle::CheckedOutBuf, FixedBuffers}; +use crate::buf::IoBufMut; + +use libc::{iovec, UIO_MAXIOV}; +use std::cmp; +use std::mem; +use std::ptr; +use std::slice; + +// Internal state shared by FixedBufRegistry and FixedBuf handles. +pub(crate) struct Registry { + // Pointer to an allocated array of iovec records referencing + // the allocated buffers. The number of initialized records is the + // same as the length of the states array. + raw_bufs: ptr::NonNull, + // Original capacity of raw_bufs as a Vec. + orig_cap: usize, + // State information on the buffers. Indices in this array correspond to + // the indices in the array at raw_bufs. + states: Vec, + // The owned buffers are kept until Drop + buffers: Vec, +} + +// State information of a buffer in the registry, +enum BufState { + // The buffer is not in use. + // The field records the length of the initialized part. + Free { init_len: usize }, + // The buffer is checked out. + // Its data are logically owned by the FixedBuf handle, + // which also keeps track of the length of the initialized part. + CheckedOut, +} + +impl Registry { + pub(crate) fn new(bufs: impl Iterator) -> Self { + // Limit the number of buffers to the maximum allowable number. + let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, u16::MAX as usize)); + // Collect into `buffers`, which holds the backing buffers for + // the lifetime of the pool. Using collect may allow + // the compiler to apply collect in place specialization, + // to avoid an allocation. + let mut buffers = bufs.collect::>(); + let mut iovecs = Vec::with_capacity(buffers.len()); + let mut states = Vec::with_capacity(buffers.len()); + for buf in buffers.iter_mut() { + iovecs.push(iovec { + iov_base: buf.stable_mut_ptr() as *mut _, + iov_len: buf.bytes_total(), + }); + states.push(BufState::Free { + init_len: buf.bytes_init(), + }); + } + debug_assert_eq!(iovecs.len(), states.len()); + debug_assert_eq!(iovecs.len(), buffers.len()); + + // Safety: Vec::as_mut_ptr never returns null + let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) }; + let orig_cap = iovecs.capacity(); + mem::forget(iovecs); + Registry { + raw_bufs, + orig_cap, + states, + buffers, + } + } + + // If the indexed buffer is free, changes its state to checked out + // and returns its data. + // If the buffer is already checked out, returns None. + pub(crate) fn check_out(&mut self, index: usize) -> Option { + let state = self.states.get_mut(index)?; + let BufState::Free { init_len } = *state else { + return None + }; + + *state = BufState::CheckedOut; + + // Safety: the allocated array under the pointer is valid + // for the lifetime of self, the index is inside the array + // as checked by Vec::get_mut above, called on the array of + // states that has the same length. + let iovec = unsafe { self.raw_bufs.as_ptr().add(index).read() }; + debug_assert!(index <= u16::MAX as usize); + Some(CheckedOutBuf { + iovec, + init_len, + index: index as u16, + }) + } + + fn check_in_internal(&mut self, index: u16, init_len: usize) { + let state = self + .states + .get_mut(index as usize) + .expect("invalid buffer index"); + debug_assert!( + matches!(state, BufState::CheckedOut), + "the buffer must be checked out" + ); + *state = BufState::Free { init_len }; + } +} + +impl FixedBuffers for Registry { + fn iovecs(&self) -> &[iovec] { + // Safety: the raw_bufs pointer is valid for the lifetime of self, + // the length of the states array is also the length of buffers array + // by construction. + unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) } + } + + unsafe fn check_in(&mut self, index: u16, init_len: usize) { + self.check_in_internal(index, init_len) + } +} + +impl Drop for Registry { + fn drop(&mut self) { + for (i, state) in self.states.iter().enumerate() { + match state { + BufState::Free { init_len, .. } => { + // Update buffer initialization. + // The buffer is about to be dropped, but this may release it + // from Registry ownership, rather than deallocate. + unsafe { self.buffers[i].set_init(*init_len) }; + } + BufState::CheckedOut => unreachable!("all buffers must be checked in"), + } + } + + // Rebuild Vec, so it's dropped + let _ = unsafe { + Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap) + }; + } +} diff --git a/src/buf/fixed/pool.rs b/src/buf/fixed/pool.rs index f1afc643..409d27e9 100644 --- a/src/buf/fixed/pool.rs +++ b/src/buf/fixed/pool.rs @@ -1,17 +1,25 @@ -use super::handle::CheckedOutBuf; -use super::{FixedBuf, FixedBuffers}; +//! A dynamic collection of I/O buffers pre-registered with the kernel. +//! +//! This module provides [`FixedBufPool`], a collection that implements +//! dynamic management of sets of interchangeable memory buffers +//! registered with the kernel for `io-uring` operations. Asynchronous +//! rotation of the buffers shared by multiple tasks is also supported +//! by [`FixedBufPool`]. +//! +//! [`FixedBufPool`]: self::FixedBufPool +use super::plumbing; +use super::FixedBuf; use crate::buf::IoBufMut; use crate::runtime::CONTEXT; -use libc::{iovec, UIO_MAXIOV}; + +use tokio::pin; +use tokio::sync::Notify; + use std::cell::RefCell; -use std::cmp; -use std::collections::HashMap; use std::io; -use std::mem; -use std::ptr; use std::rc::Rc; -use std::slice; +use std::sync::Arc; /// A dynamic collection of I/O buffers pre-registered with the kernel. /// @@ -20,7 +28,9 @@ use std::slice; /// context using the [`register`] method. Unlike [`FixedBufRegistry`], /// individual buffers are not retrieved by index; instead, an available /// buffer matching a specified capacity can be retrieved with the [`try_next`] -/// method. This allows some flexibility in managing sets of buffers with +/// method. In asynchronous contexts, the [`next`] method can be used to wait +/// until such a buffer becomes available. +/// This allows some flexibility in managing sets of buffers with /// different capacity tiers. The need to maintain lists of free buffers, /// however, imposes additional runtime overhead. /// @@ -38,8 +48,10 @@ use std::slice; /// /// [`register`]: Self::register /// [`try_next`]: Self::try_next +/// [`next`]: Self::next /// [`FixedBufRegistry`]: super::FixedBufRegistry /// [`Runtime`]: crate::Runtime +/// [`FixedBuf`]: super::FixedBuf /// /// # Examples /// @@ -83,7 +95,7 @@ use std::slice; /// ``` #[derive(Clone)] pub struct FixedBufPool { - inner: Rc>>, + inner: Rc>>, } impl FixedBufPool { @@ -152,7 +164,7 @@ impl FixedBufPool { /// ``` pub fn new(bufs: impl IntoIterator) -> Self { FixedBufPool { - inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))), + inner: Rc::new(RefCell::new(plumbing::Pool::new(bufs.into_iter()))), } } @@ -222,175 +234,65 @@ impl FixedBufPool { pub fn try_next(&self, cap: usize) -> Option { let mut inner = self.inner.borrow_mut(); inner.try_next(cap).map(|data| { - let registry = Rc::clone(&self.inner); + let pool = Rc::clone(&self.inner); // Safety: the validity of buffer data is ensured by - // Inner::try_next - unsafe { FixedBuf::new(registry, data) } + // plumbing::Pool::try_next + unsafe { FixedBuf::new(pool, data) } }) } -} - -// Internal state shared by FixedBufPool and FixedBuf handles. -struct Inner { - // Pointer to an allocated array of iovec records referencing - // the allocated buffers. The number of initialized records is the - // same as the length of the states array. - raw_bufs: ptr::NonNull, - // State information on the buffers. Indices in this array correspond to - // the indices in the array at raw_bufs. - states: Vec, - // Original capacity of raw_bufs as a Vec. - orig_cap: usize, - // Original buffers, kept until drop - buffers: Vec, - // Table of head indices of the free buffer lists in each size bucket. - free_buf_head_by_cap: HashMap, -} - -// State information of a buffer in the registry, -enum BufState { - // The buffer is not in use. - Free { - // This field records the length of the initialized part. - init_len: usize, - // Index of the next buffer of the same capacity in a free buffer list, if any. - next: Option, - }, - // The buffer is checked out. - // Its data are logically owned by the FixedBuf handle, - // which also keeps track of the length of the initialized part. - CheckedOut, -} - -impl Inner { - fn new(bufs: impl Iterator) -> Self { - // Limit the number of buffers to the maximum allowable number. - let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, u16::MAX as usize)); - // Collect into `buffers`, which holds the backing buffers for - // the lifetime of the pool. Using collect may allow - // the compiler to apply collect in place specialization, - // to avoid an allocation. - let mut buffers = bufs.collect::>(); - let mut iovecs = Vec::with_capacity(buffers.len()); - let mut states = Vec::with_capacity(buffers.len()); - let mut free_buf_head_by_cap = HashMap::new(); - for (index, buf) in buffers.iter_mut().enumerate() { - let cap = buf.bytes_total(); - - // Link the buffer as the head of the free list for its capacity. - // This constructs the free buffer list to be initially retrieved - // back to front, which should be of no difference to the user. - let next = free_buf_head_by_cap.insert(cap, index as u16); - - iovecs.push(iovec { - iov_base: buf.stable_mut_ptr() as *mut _, - iov_len: cap, - }); - states.push(BufState::Free { - init_len: buf.bytes_init(), - next, - }); - } - debug_assert_eq!(iovecs.len(), states.len()); - debug_assert_eq!(iovecs.len(), buffers.len()); - - // Safety: Vec::as_mut_ptr never returns null - let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) }; - let orig_cap = iovecs.capacity(); - mem::forget(iovecs); - Inner { - raw_bufs, - states, - orig_cap, - buffers, - free_buf_head_by_cap, - } - } - // If the free buffer list for this capacity is not empty, checks out the first buffer - // from the list and returns its data. Otherwise, returns None. - fn try_next(&mut self, cap: usize) -> Option { - let free_head = self.free_buf_head_by_cap.get_mut(&cap)?; - let index = *free_head as usize; - let state = &mut self.states[index]; - - let (init_len, next) = match *state { - BufState::Free { init_len, next } => { - *state = BufState::CheckedOut; - (init_len, next) + /// Resolves to a buffer of requested capacity + /// when it is or becomes available in this pool. + /// This may happen when a [`FixedBuf`] handle owning a buffer + /// of the same capacity is dropped. + /// + /// If no matching buffers are available and none are being released, + /// this asynchronous function will never resolve. Applications should take + /// care to wait on the returned future concurrently with some tasks that + /// will complete I/O operations owning the buffers, or back it up with a + /// timeout using, for example, `tokio::util::timeout`. + pub async fn next(&self, cap: usize) -> FixedBuf { + // Fast path: get the buffer if it's already available + let notify = { + let mut inner = self.inner.borrow_mut(); + if let Some(data) = inner.try_next(cap) { + // Safety: the validity of buffer data is ensured by + // plumbing::Pool::try_next + let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) }; + return buf; } - BufState::CheckedOut => panic!("buffer is checked out"), + inner.notify_on_next(cap) }; - // Update the head of the free list for this capacity. - match next { - Some(i) => { - *free_head = i; - } - None => { - self.free_buf_head_by_cap.remove(&cap); - } - } - - // Safety: the allocated array under the pointer is valid - // for the lifetime of self, a free buffer index is inside the array, - // as also asserted by the indexing operation on the states array - // that has the same length. - let iovec = unsafe { self.raw_bufs.as_ptr().add(index).read() }; - debug_assert_eq!(iovec.iov_len, cap); - Some(CheckedOutBuf { - iovec, - init_len, - index: index as u16, - }) - } - - fn check_in_internal(&mut self, index: u16, init_len: usize) { - let cap = self.iovecs()[index as usize].iov_len; - let state = &mut self.states[index as usize]; - debug_assert!( - matches!(state, BufState::CheckedOut), - "the buffer must be checked out" - ); - - // Link the buffer as the new head of the free list for its capacity. - // Recently checked in buffers will be first to be reused, - // improving cache locality. - let next = self.free_buf_head_by_cap.insert(cap, index); - - *state = BufState::Free { init_len, next }; - } -} - -impl FixedBuffers for Inner { - fn iovecs(&self) -> &[iovec] { - // Safety: the raw_bufs pointer is valid for the lifetime of self, - // the length of the states array is also the length of buffers array - // by construction. - unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) } + // Poll for a buffer, engaging the `Notify` machinery. + self.next_when_notified(cap, notify).await } - unsafe fn check_in(&mut self, index: u16, init_len: usize) { - self.check_in_internal(index, init_len) - } -} + #[cold] + async fn next_when_notified(&self, cap: usize, notify: Arc) -> FixedBuf { + let notified = notify.notified(); + pin!(notified); + loop { + // In the single-threaded case, no buffers could get checked in + // between us calling `try_next` and here, so we can't miss a wakeup. + notified.as_mut().await; -impl Drop for Inner { - fn drop(&mut self) { - for (i, state) in self.states.iter().enumerate() { - match state { - BufState::Free { init_len, .. } => { - // Update buffer initalisation. - // The buffer is about to dropped, but this may release it - // from Registry ownership, rather than deallocate. - unsafe { self.buffers[i].set_init(*init_len) }; - } - BufState::CheckedOut => unreachable!("all buffers must be checked in"), + if let Some(data) = self.inner.borrow_mut().try_next(cap) { + // Safety: the validity of buffer data is ensured by + // plumbing::Pool::try_next + let buf = unsafe { FixedBuf::new(Rc::clone(&self.inner) as _, data) }; + return buf; } + + // It's possible that the task did not get a buffer from `try_next`. + // The `Notify` entries are created once for each requested capacity + // and never removed, so this `Notify` could have been holding + // a permit from a buffer checked in previously when no tasks were + // waiting. Then a task would call `next` on this pool and receive + // the buffer without consuming the permit. It's also possible that + // a task calls `try_next` directly. + // Reset the `Notified` future to wait for another wakeup. + notified.set(notify.notified()); } - // Rebuild Vec, so it's dropped - let _ = unsafe { - Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap) - }; } } diff --git a/src/buf/fixed/registry.rs b/src/buf/fixed/registry.rs index 2904bf81..cf6456be 100644 --- a/src/buf/fixed/registry.rs +++ b/src/buf/fixed/registry.rs @@ -1,16 +1,11 @@ -use super::handle::CheckedOutBuf; -use super::{FixedBuf, FixedBuffers}; +use super::plumbing; +use super::FixedBuf; use crate::buf::IoBufMut; use crate::runtime::CONTEXT; -use libc::{iovec, UIO_MAXIOV}; use std::cell::RefCell; -use std::cmp; use std::io; -use std::mem; -use std::ptr; use std::rc::Rc; -use std::slice; /// An indexed collection of I/O buffers pre-registered with the kernel. /// @@ -36,7 +31,7 @@ use std::slice; /// [`Runtime`]: crate::Runtime #[derive(Clone)] pub struct FixedBufRegistry { - inner: Rc>>, + inner: Rc>>, } impl FixedBufRegistry { @@ -105,7 +100,7 @@ impl FixedBufRegistry { /// ``` pub fn new(bufs: impl IntoIterator) -> Self { FixedBufRegistry { - inner: Rc::new(RefCell::new(Inner::new(bufs.into_iter()))), + inner: Rc::new(RefCell::new(plumbing::Registry::new(bufs.into_iter()))), } } @@ -173,140 +168,8 @@ impl FixedBufRegistry { inner.check_out(index).map(|data| { let registry = Rc::clone(&self.inner); // Safety: the validity of buffer data is ensured by - // Inner::check_out + // plumbing::Registry::check_out unsafe { FixedBuf::new(registry, data) } }) } } - -// Internal state shared by FixedBufRegistry and FixedBuf handles. -struct Inner { - // Pointer to an allocated array of iovec records referencing - // the allocated buffers. The number of initialized records is the - // same as the length of the states array. - raw_bufs: ptr::NonNull, - // State information on the buffers. Indices in this array correspond to - // the indices in the array at raw_bufs. - states: Vec, - // Original capacity of raw_bufs as a Vec. - orig_cap: usize, - // The owned buffers are kept until Drop - buffers: Vec, -} - -// State information of a buffer in the registry, -enum BufState { - // The buffer is not in use. - // The field records the length of the initialized part. - Free { init_len: usize }, - // The buffer is checked out. - // Its data are logically owned by the FixedBuf handle, - // which also keeps track of the length of the initialized part. - CheckedOut, -} - -impl Inner { - fn new(bufs: impl Iterator) -> Self { - // Limit the number of buffers to the maximum allowable number. - let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, u16::MAX as usize)); - // Collect into `buffers`, which holds the backing buffers for - // the lifetime of the pool. Using collect may allow - // the compiler to apply collect in place specialization, - // to avoid an allocation. - let mut buffers = bufs.collect::>(); - let mut iovecs = Vec::with_capacity(buffers.len()); - let mut states = Vec::with_capacity(buffers.len()); - for buf in buffers.iter_mut() { - iovecs.push(iovec { - iov_base: buf.stable_mut_ptr() as *mut _, - iov_len: buf.bytes_total(), - }); - states.push(BufState::Free { - init_len: buf.bytes_init(), - }); - } - debug_assert_eq!(iovecs.len(), states.len()); - debug_assert_eq!(iovecs.len(), buffers.len()); - - // Safety: Vec::as_mut_ptr never returns null - let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) }; - let orig_cap = iovecs.capacity(); - mem::forget(iovecs); - Inner { - raw_bufs, - states, - orig_cap, - buffers, - } - } - - // If the indexed buffer is free, changes its state to checked out - // and returns its data. - // If the buffer is already checked out, returns None. - fn check_out(&mut self, index: usize) -> Option { - let state = self.states.get_mut(index)?; - let BufState::Free { init_len } = *state else { - return None - }; - - *state = BufState::CheckedOut; - - // Safety: the allocated array under the pointer is valid - // for the lifetime of self, the index is inside the array - // as checked by Vec::get_mut above, called on the array of - // states that has the same length. - let iovec = unsafe { self.raw_bufs.as_ptr().add(index).read() }; - debug_assert!(index <= u16::MAX as usize); - Some(CheckedOutBuf { - iovec, - init_len, - index: index as u16, - }) - } - - fn check_in_internal(&mut self, index: u16, init_len: usize) { - let state = self - .states - .get_mut(index as usize) - .expect("invalid buffer index"); - debug_assert!( - matches!(state, BufState::CheckedOut), - "the buffer must be checked out" - ); - *state = BufState::Free { init_len }; - } -} - -impl FixedBuffers for Inner { - fn iovecs(&self) -> &[iovec] { - // Safety: the raw_bufs pointer is valid for the lifetime of self, - // the length of the states array is also the length of buffers array - // by construction. - unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) } - } - - unsafe fn check_in(&mut self, index: u16, init_len: usize) { - self.check_in_internal(index, init_len) - } -} - -impl Drop for Inner { - fn drop(&mut self) { - for (i, state) in self.states.iter().enumerate() { - match state { - BufState::Free { init_len, .. } => { - // Update buffer initialisation. - // The buffer is about to be dropped, but this may release it - // from Registry ownership, rather than deallocate. - unsafe { self.buffers[i].set_init(*init_len) }; - } - BufState::CheckedOut => unreachable!("all buffers must be checked in"), - } - } - - // Rebuild Vec, so it's dropped - let _ = unsafe { - Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap) - }; - } -} diff --git a/tests/fixed_buf.rs b/tests/fixed_buf.rs index fc28e9db..a5442217 100644 --- a/tests/fixed_buf.rs +++ b/tests/fixed_buf.rs @@ -1,9 +1,11 @@ use tokio_test::assert_err; -use tokio_uring::buf::fixed::FixedBufRegistry; -use tokio_uring::buf::BoundedBuf; +use tokio_uring::buf::fixed::{FixedBufPool, FixedBufRegistry}; +use tokio_uring::buf::{BoundedBuf, BoundedBufMut}; use tokio_uring::fs::File; +use std::fs::File as StdFile; use std::io::prelude::*; +use std::iter; use std::mem; use tempfile::NamedTempFile; @@ -97,7 +99,7 @@ fn slicing() { tempfile.write_all(HELLO).unwrap(); let file = File::from_std( - std::fs::File::options() + StdFile::options() .read(true) .write(true) .open(tempfile.path()) @@ -137,6 +139,54 @@ fn slicing() { }) } +#[test] +fn pool_next_as_concurrency_limit() { + tokio_uring::start(async move { + const BUF_SIZE: usize = 80; + + let mut tempfile = tempfile(); + let file = StdFile::options() + .write(true) + .open(tempfile.path()) + .unwrap(); + + let buffers = FixedBufPool::new(iter::repeat_with(|| Vec::with_capacity(BUF_SIZE)).take(2)); + buffers.register().unwrap(); + + let mut join_handles = vec![]; + for i in 0..10 { + let mut buf = buffers.next(BUF_SIZE).await; + println!( + "[main] iteration {}: obtained buffer {}", + i, + buf.buf_index() + ); + let cloned_file = file.try_clone().unwrap(); + + let handle = tokio_uring::spawn(async move { + let file = File::from_std(cloned_file); + let data = [b'0' + i as u8; BUF_SIZE]; + buf.put_slice(&data); + let (res, buf) = file.write_fixed_all_at(buf, BUF_SIZE as u64 * i).await; + res.unwrap(); + println!("[worker {}]: dropping buffer {}", i, buf.buf_index()); + }); + + join_handles.push(handle); + } + for (i, handle) in join_handles.into_iter().enumerate() { + handle + .await + .unwrap_or_else(|e| panic!("worker {} terminated abnormally: {}", i, e)); + } + + mem::drop(file); + let mut content = String::new(); + tempfile.read_to_string(&mut content).unwrap(); + println!("{}", content); + }) +} + fn tempfile() -> NamedTempFile { NamedTempFile::new().unwrap() }