From ce36b817bb13ac23ecdc3dd58479f1b7c41071a1 Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Sat, 22 Oct 2022 16:49:08 -0500 Subject: [PATCH 1/6] rt: refactor runtime to avoid Rc> This change refactors the runtime to not use reference counting directly in the ops themselves. Instead, ops access the driver via thread local variables. This is sound because dropping the driver (which happens when it is removed from its thread-local state) blocks the thread until all ops complete, ensuring that we do not free the contents of the driver until after all operations have completed. --- src/driver/mod.rs | 47 ++---- src/driver/op.rs | 251 ++++++++++++++++------------- src/lib.rs | 1 + src/runtime/context.rs | 56 +++++++ src/{runtime.rs => runtime/mod.rs} | 71 +++++--- src/util.rs | 4 + 6 files changed, 255 insertions(+), 175 deletions(-) create mode 100644 src/runtime/context.rs rename src/{runtime.rs => runtime/mod.rs} (58%) create mode 100644 src/util.rs diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 78d121e7..48020b19 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -39,21 +39,12 @@ mod write; mod writev; -use io_uring::IoUring; -use scoped_tls::scoped_thread_local; +use io_uring::{cqueue, IoUring}; use slab::Slab; -use std::cell::RefCell; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -use std::rc::Rc; pub(crate) struct Driver { - inner: Handle, -} - -type Handle = Rc>; - -pub(crate) struct Inner { /// In-flight operations ops: Ops, @@ -67,45 +58,25 @@ struct Ops { lifecycle: Slab, } -scoped_thread_local!(pub(crate) static CURRENT: Rc>); - impl Driver { pub(crate) fn new(b: &crate::Builder) -> io::Result { let uring = b.urb.build(b.entries)?; - let inner = Rc::new(RefCell::new(Inner { + Ok(Driver { ops: Ops::new(), uring, - })); - - Ok(Driver { inner }) + }) } - /// Enter the driver context. This enables using uring types. - pub(crate) fn with(&self, f: impl FnOnce() -> R) -> R { - CURRENT.set(&self.inner, f) + fn wait(&mut self) -> io::Result { + self.uring.submit_and_wait(1) } - pub(crate) fn tick(&self) { - let mut inner = self.inner.borrow_mut(); - inner.tick(); + fn num_operations(&mut self) -> usize { + self.ops.0.len() } - fn wait(&self) -> io::Result { - let mut inner = self.inner.borrow_mut(); - let inner = &mut *inner; - - inner.uring.submit_and_wait(1) - } - - fn num_operations(&self) -> usize { - let inner = self.inner.borrow(); - inner.ops.lifecycle.len() - } -} - -impl Inner { - fn tick(&mut self) { + pub(crate) fn tick(&mut self) { let mut cq = self.uring.completion(); cq.sync(); @@ -143,7 +114,7 @@ impl Inner { impl AsRawFd for Driver { fn as_raw_fd(&self) -> RawFd { - self.inner.borrow().uring.as_raw_fd() + self.uring.as_raw_fd() } } diff --git a/src/driver/op.rs b/src/driver/op.rs index eac3d5ca..f9ca3917 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -1,24 +1,24 @@ -use std::cell::RefCell; use std::future::Future; use std::io; +use std::marker::PhantomData; use std::pin::Pin; -use std::rc::Rc; use std::task::{Context, Poll, Waker}; use io_uring::{cqueue, squeue}; use crate::driver; +use crate::runtime::CONTEXT; +use crate::util::PhantomUnsendUnsync; /// In-flight operation pub(crate) struct Op { - // Driver running the operation - pub(super) driver: Rc>, - // Operation index in the slab pub(super) index: usize, // Per-operation data data: Option, + + _phantom: PhantomUnsendUnsync, } pub(crate) trait Completable { @@ -66,11 +66,11 @@ where T: Completable, { /// Create a new operation - fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc>) -> Self { + fn new(data: T, inner: &mut driver::Driver) -> Self { Op { - driver: inner_rc.clone(), index: inner.ops.insert(), data: Some(data), + _phantom: PhantomData, } } @@ -82,23 +82,22 @@ where where F: FnOnce(&mut T) -> squeue::Entry, { - driver::CURRENT.with(|inner_rc| { - let mut inner_ref = inner_rc.borrow_mut(); - let inner = &mut *inner_ref; - - // Create the operation - let mut op = Op::new(data, inner, inner_rc); - - // Configure the SQE - let sqe = f(op.data.as_mut().unwrap()).user_data(op.index as _); - - // Push the new operation - while unsafe { inner.uring.submission().push(&sqe).is_err() } { - // If the submission queue is full, flush it to the kernel - inner.submit()?; - } - - Ok(op) + CONTEXT.with(|cx| { + cx.with_driver(|driver| { + // Create the operation + let mut op = Op::new(data, driver); + + // Configure the SQE + let sqe = f(op.data.as_mut().unwrap()).user_data(op.index as _); + + // Push the new operation + while unsafe { driver.uring.submission().push(&sqe).is_err() } { + // If the submission queue is full, flush it to the kernel + driver.submit()?; + } + + Ok(op) + }) }) } @@ -107,7 +106,7 @@ where where F: FnOnce(&mut T) -> squeue::Entry, { - if driver::CURRENT.is_set() { + if CONTEXT.with(|cx| cx.is_set()) { Op::submit_with(data, f) } else { Err(io::ErrorKind::Other.into()) @@ -125,49 +124,60 @@ where use std::mem; let me = &mut *self; - let mut inner = me.driver.borrow_mut(); - let lifecycle = inner.ops.get_mut(me.index).expect("invalid internal state"); - match mem::replace(lifecycle, Lifecycle::Submitted) { - Lifecycle::Submitted => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { - *lifecycle = Lifecycle::Waiting(cx.waker().clone()); - Poll::Pending - } - Lifecycle::Waiting(waker) => { - *lifecycle = Lifecycle::Waiting(waker); - Poll::Pending - } - Lifecycle::Ignored(..) => unreachable!(), - Lifecycle::Completed(cqe) => { - inner.ops.remove(me.index); - me.index = usize::MAX; - Poll::Ready(me.data.take().unwrap().complete(cqe)) - } - } + CONTEXT.with(|runtime_context| { + runtime_context.with_driver(|driver| { + let lifecycle = driver + .ops + .get_mut(me.index) + .expect("invalid internal state"); + + match mem::replace(lifecycle, Lifecycle::Submitted) { + Lifecycle::Submitted => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) if !waker.will_wake(cx.waker()) => { + *lifecycle = Lifecycle::Waiting(cx.waker().clone()); + Poll::Pending + } + Lifecycle::Waiting(waker) => { + *lifecycle = Lifecycle::Waiting(waker); + Poll::Pending + } + Lifecycle::Ignored(..) => unreachable!(), + Lifecycle::Completed(cqe) => { + driver.ops.remove(me.index); + me.index = usize::MAX; + + Poll::Ready(me.data.take().unwrap().complete(cqe)) + } + } + }) + }) } } impl Drop for Op { fn drop(&mut self) { - let mut inner = self.driver.borrow_mut(); - let lifecycle = match inner.ops.get_mut(self.index) { - Some(lifecycle) => lifecycle, - None => return, - }; - - match lifecycle { - Lifecycle::Submitted | Lifecycle::Waiting(_) => { - *lifecycle = Lifecycle::Ignored(Box::new(self.data.take())); - } - Lifecycle::Completed(..) => { - inner.ops.remove(self.index); - } - Lifecycle::Ignored(..) => unreachable!(), - } + CONTEXT.with(|runtime_context| { + runtime_context.with_driver(|driver| { + let lifecycle = match driver.ops.get_mut(self.index) { + Some(lifecycle) => lifecycle, + None => return, + }; + + match lifecycle { + Lifecycle::Submitted | Lifecycle::Waiting(_) => { + *lifecycle = Lifecycle::Ignored(Box::new(self.data.take())); + } + Lifecycle::Completed(..) => { + driver.ops.remove(self.index); + } + Lifecycle::Ignored(..) => unreachable!(), + } + }) + }) } } @@ -220,24 +230,24 @@ mod test { #[test] fn op_stays_in_slab_on_drop() { - let (op, driver, data) = init(); + let (op, data) = init(); drop(op); assert_eq!(2, Rc::strong_count(&data)); - assert_eq!(1, driver.num_operations()); - release(driver); + assert_eq!(1, num_operations()); + release(); } #[test] fn poll_op_once() { - let (op, driver, data) = init(); + let (op, data) = init(); let mut op = task::spawn(op); assert_pending!(op.poll()); assert_eq!(2, Rc::strong_count(&data)); complete(&op, Ok(1)); - assert_eq!(1, driver.num_operations()); + assert_eq!(1, num_operations()); assert_eq!(2, Rc::strong_count(&data)); assert!(op.is_woken()); @@ -254,54 +264,58 @@ mod test { assert_eq!(1, Rc::strong_count(&data)); drop(op); - assert_eq!(0, driver.num_operations()); + assert_eq!(0, num_operations()); - release(driver); + release(); } #[test] fn poll_op_twice() { - let (op, driver, ..) = init(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); - assert_pending!(op.poll()); - - complete(&op, Ok(1)); - - assert!(op.is_woken()); - let Completion { result, flags, .. } = assert_ready!(op.poll()); - assert_eq!(1, result.unwrap()); - assert_eq!(0, flags); + { + let (op, ..) = init(); + let mut op = task::spawn(op); + assert_pending!(op.poll()); + assert_pending!(op.poll()); + + complete(&op, Ok(1)); + + assert!(op.is_woken()); + let Completion { result, flags, .. } = assert_ready!(op.poll()); + assert_eq!(1, result.unwrap()); + assert_eq!(0, flags); + } - release(driver); + release(); } #[test] fn poll_change_task() { - let (op, driver, ..) = init(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); + { + let (op, ..) = init(); + let mut op = task::spawn(op); + assert_pending!(op.poll()); - let op = op.into_inner(); - let mut op = task::spawn(op); - assert_pending!(op.poll()); + let op = op.into_inner(); + let mut op = task::spawn(op); + assert_pending!(op.poll()); - complete(&op, Ok(1)); + complete(&op, Ok(1)); - assert!(op.is_woken()); - let Completion { result, flags, .. } = assert_ready!(op.poll()); - assert_eq!(1, result.unwrap()); - assert_eq!(0, flags); + assert!(op.is_woken()); + let Completion { result, flags, .. } = assert_ready!(op.poll()); + assert_eq!(1, result.unwrap()); + assert_eq!(0, flags); + } - release(driver); + release(); } #[test] fn complete_before_poll() { - let (op, driver, data) = init(); + let (op, data) = init(); let mut op = task::spawn(op); complete(&op, Ok(1)); - assert_eq!(1, driver.num_operations()); + assert_eq!(1, num_operations()); assert_eq!(2, Rc::strong_count(&data)); let Completion { result, flags, .. } = assert_ready!(op.poll()); @@ -309,52 +323,63 @@ mod test { assert_eq!(0, flags); drop(op); - assert_eq!(0, driver.num_operations()); + assert_eq!(0, num_operations()); - release(driver); + release(); } #[test] fn complete_after_drop() { - let (op, driver, data) = init(); + let (op, data) = init(); let index = op.index; drop(op); assert_eq!(2, Rc::strong_count(&data)); - assert_eq!(1, driver.num_operations()); + assert_eq!(1, num_operations()); + let cqe = CqeResult { result: Ok(1), flags: 0, }; - driver.inner.borrow_mut().ops.complete(index, cqe); + + CONTEXT.with(|cx| cx.with_driver(|driver| driver.ops.complete(index, cqe))); + assert_eq!(1, Rc::strong_count(&data)); - assert_eq!(0, driver.num_operations()); - release(driver); + assert_eq!(0, num_operations()); + + release(); } - fn init() -> (Op>, crate::driver::Driver, Rc<()>) { + fn init() -> (Op>, Rc<()>) { use crate::driver::Driver; let driver = Driver::new(&crate::builder()).unwrap(); - let handle = driver.inner.clone(); let data = Rc::new(()); - let op = { - let mut inner = handle.borrow_mut(); - Op::new(data.clone(), &mut inner, &handle) - }; + let op = CONTEXT.with(|cx| { + cx.set_driver(driver); + + cx.with_driver(|driver| Op::new(data.clone(), driver)) + }); - (op, driver, data) + (op, data) + } + + fn num_operations() -> usize { + CONTEXT.with(|cx| cx.with_driver(|driver| driver.num_operations())) } fn complete(op: &Op>, result: io::Result) { let cqe = CqeResult { result, flags: 0 }; - op.driver.borrow_mut().ops.complete(op.index, cqe); + CONTEXT.with(|cx| cx.with_driver(|driver| driver.ops.complete(op.index, cqe))); } - fn release(driver: crate::driver::Driver) { - // Clear ops, we aren't really doing any I/O - driver.inner.borrow_mut().ops.lifecycle.clear(); + fn release() { + CONTEXT.with(|cx| { + cx.with_driver(|driver| driver.ops.lifecycle.clear()); + + cx.unset_driver(); + }); } } diff --git a/src/lib.rs b/src/lib.rs index a5625cf7..2e8d031d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ macro_rules! syscall { mod future; mod driver; mod runtime; +mod util; pub mod buf; pub mod fs; diff --git a/src/runtime/context.rs b/src/runtime/context.rs new file mode 100644 index 00000000..d925a53d --- /dev/null +++ b/src/runtime/context.rs @@ -0,0 +1,56 @@ +use crate::driver::Driver; +use crate::util::PhantomUnsendUnsync; +use std::cell::RefCell; +use std::marker::PhantomData; + +/// Owns the driver and resides in thread-local storage. +pub(crate) struct RuntimeContext { + driver: RefCell>, + _phantom: PhantomUnsendUnsync, +} + +impl RuntimeContext { + /// Construct the context with an uninitialized driver. + pub(crate) const fn new() -> Self { + Self { + driver: RefCell::new(None), + _phantom: PhantomData, + } + } + + /// Initialize the driver. + pub(crate) fn set_driver(&self, driver: Driver) { + let mut guard = self.driver.borrow_mut(); + + assert!(guard.is_none(), "Attempted to initialize the driver twice"); + + *guard = Some(driver); + } + + pub(crate) fn unset_driver(&self) { + let mut guard = self.driver.borrow_mut(); + + assert!(guard.is_some(), "Attempted to clear nonexistent driver"); + + *guard = None; + } + + /// Check if driver is initialized + pub(crate) fn is_set(&self) -> bool { + self.driver.borrow().is_some() + } + + /// Execute a function which requires access to the driver. + pub(crate) fn with_driver(&self, f: F) -> R + where + F: FnOnce(&mut Driver) -> R, + { + let mut guard = self.driver.borrow_mut(); + + let driver = guard + .as_mut() + .expect("Attempted to access driver in invalid context"); + + f(driver) + } +} diff --git a/src/runtime.rs b/src/runtime/mod.rs similarity index 58% rename from src/runtime.rs rename to src/runtime/mod.rs index a82154e2..9f8b012a 100644 --- a/src/runtime.rs +++ b/src/runtime/mod.rs @@ -1,15 +1,23 @@ -use crate::driver::{Driver, CURRENT}; -use std::cell::RefCell; +use crate::driver::Driver; use std::future::Future; use std::io; +use std::os::unix::io::{AsRawFd, RawFd}; use tokio::io::unix::AsyncFd; use tokio::task::LocalSet; +mod context; + +pub(crate) use context::RuntimeContext; + +thread_local! { + pub(crate) static CONTEXT: RuntimeContext = RuntimeContext::new(); +} + /// The Runtime executor pub struct Runtime { /// io-uring driver - driver: AsyncFd, + uring_fd: RawFd, /// LocalSet for !Send tasks local: LocalSet, @@ -44,7 +52,7 @@ pub struct Runtime { /// handle.await.unwrap(); /// }); /// ``` -pub fn spawn(task: T) -> tokio::task::JoinHandle { +pub fn spawn(task: T) -> tokio::task::JoinHandle { tokio::task::spawn_local(task) } @@ -53,8 +61,8 @@ impl Runtime { pub fn new(b: &crate::Builder) -> io::Result { let rt = tokio::runtime::Builder::new_current_thread() .on_thread_park(|| { - CURRENT.with(|x| { - let _ = RefCell::borrow_mut(x).uring.submit(); + CONTEXT.with(|x| { + let _ = x.with_driver(|d| d.uring.submit()); }); }) .enable_all() @@ -62,12 +70,17 @@ impl Runtime { let local = LocalSet::new(); - let driver = { - let _guard = rt.enter(); - AsyncFd::new(Driver::new(b)?)? - }; + let driver = Driver::new(b)?; + + let driver_fd = driver.as_raw_fd(); - Ok(Runtime { driver, local, rt }) + CONTEXT.with(|cx| cx.set_driver(driver)); + + Ok(Runtime { + uring_fd: driver_fd, + local, + rt, + }) } /// Runs a future to completion on the current runtime @@ -75,24 +88,34 @@ impl Runtime { where F: Future, { - self.driver.get_ref().with(|| { - let drive = async { + let drive = { + let _guard = self.rt.enter(); + let driver = AsyncFd::new(self.uring_fd).unwrap(); + + async move { loop { // Wait for read-readiness - let mut guard = self.driver.readable().await.unwrap(); - self.driver.get_ref().tick(); + let mut guard = driver.readable().await.unwrap(); + CONTEXT.with(|cx| cx.with_driver(|driver| driver.tick())); guard.clear_ready(); } - }; + } + }; - tokio::pin!(drive); - tokio::pin!(future); + tokio::pin!(future); - self.rt - .block_on(self.local.run_until(crate::future::poll_fn(|cx| { - assert!(drive.as_mut().poll(cx).is_pending()); - future.as_mut().poll(cx) - }))) - }) + self.local.spawn_local(drive); + + self.rt + .block_on(self.local.run_until(crate::future::poll_fn(|cx| { + // assert!(drive.as_mut().poll(cx).is_pending()); + future.as_mut().poll(cx) + }))) + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + CONTEXT.with(|rc| rc.unset_driver()) } } diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 00000000..6fd08051 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,4 @@ +use std::marker::PhantomData; + +/// Utility ZST for ensuring that opcodes are `!Send` and `!Sync`. +pub(crate) type PhantomUnsendUnsync = PhantomData<*mut ()>; From 78b5f730d0b82ca176d2ba7ff76c7bd87f132d27 Mon Sep 17 00:00:00 2001 From: noah Date: Sat, 22 Oct 2022 19:52:39 -0500 Subject: [PATCH 2/6] fix double panic --- src/runtime/context.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/runtime/context.rs b/src/runtime/context.rs index d925a53d..aa5d5b10 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -37,7 +37,10 @@ impl RuntimeContext { /// Check if driver is initialized pub(crate) fn is_set(&self) -> bool { - self.driver.borrow().is_some() + self.driver + .try_borrow() + .map(|b| b.is_some()) + .unwrap_or(false) } /// Execute a function which requires access to the driver. From 62b42abe4334e5d628e6262ab6361dd08d7f4e06 Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Wed, 26 Oct 2022 12:21:08 -0500 Subject: [PATCH 3/6] resolve conflicts --- src/driver/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 48020b19..4a584f49 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -39,7 +39,7 @@ mod write; mod writev; -use io_uring::{cqueue, IoUring}; +use io_uring::IoUring; use slab::Slab; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; @@ -73,7 +73,7 @@ impl Driver { } fn num_operations(&mut self) -> usize { - self.ops.0.len() + self.ops.lifecycle.len() } pub(crate) fn tick(&mut self) { From 1e77f99feb9d8882ff8ee32b37d53c1f39c8007d Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Wed, 26 Oct 2022 13:15:09 -0500 Subject: [PATCH 4/6] remove unnecessary mut --- src/driver/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 4a584f49..87dc75b2 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -68,11 +68,11 @@ impl Driver { }) } - fn wait(&mut self) -> io::Result { + fn wait(&self) -> io::Result { self.uring.submit_and_wait(1) } - fn num_operations(&mut self) -> usize { + fn num_operations(&self) -> usize { self.ops.lifecycle.len() } From ba81cc9f765831cb0f9e245dafd03445635b441e Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Wed, 26 Oct 2022 13:16:09 -0500 Subject: [PATCH 5/6] fix comment --- src/runtime/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/context.rs b/src/runtime/context.rs index aa5d5b10..d34b6a77 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -43,7 +43,7 @@ impl RuntimeContext { .unwrap_or(false) } - /// Execute a function which requires access to the driver. + /// Execute a function which requires mutable access to the driver. pub(crate) fn with_driver(&self, f: F) -> R where F: FnOnce(&mut Driver) -> R, From ab2686174631c25cd50aa7676be28fa4e28ba982 Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Wed, 26 Oct 2022 13:16:37 -0500 Subject: [PATCH 6/6] rename with_driver --- src/driver/op.rs | 16 ++++++++-------- src/runtime/context.rs | 2 +- src/runtime/mod.rs | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/driver/op.rs b/src/driver/op.rs index f9ca3917..992e35bb 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -83,7 +83,7 @@ where F: FnOnce(&mut T) -> squeue::Entry, { CONTEXT.with(|cx| { - cx.with_driver(|driver| { + cx.with_driver_mut(|driver| { // Create the operation let mut op = Op::new(data, driver); @@ -126,7 +126,7 @@ where let me = &mut *self; CONTEXT.with(|runtime_context| { - runtime_context.with_driver(|driver| { + runtime_context.with_driver_mut(|driver| { let lifecycle = driver .ops .get_mut(me.index) @@ -161,7 +161,7 @@ where impl Drop for Op { fn drop(&mut self) { CONTEXT.with(|runtime_context| { - runtime_context.with_driver(|driver| { + runtime_context.with_driver_mut(|driver| { let lifecycle = match driver.ops.get_mut(self.index) { Some(lifecycle) => lifecycle, None => return, @@ -343,7 +343,7 @@ mod test { flags: 0, }; - CONTEXT.with(|cx| cx.with_driver(|driver| driver.ops.complete(index, cqe))); + CONTEXT.with(|cx| cx.with_driver_mut(|driver| driver.ops.complete(index, cqe))); assert_eq!(1, Rc::strong_count(&data)); assert_eq!(0, num_operations()); @@ -360,24 +360,24 @@ mod test { let op = CONTEXT.with(|cx| { cx.set_driver(driver); - cx.with_driver(|driver| Op::new(data.clone(), driver)) + cx.with_driver_mut(|driver| Op::new(data.clone(), driver)) }); (op, data) } fn num_operations() -> usize { - CONTEXT.with(|cx| cx.with_driver(|driver| driver.num_operations())) + CONTEXT.with(|cx| cx.with_driver_mut(|driver| driver.num_operations())) } fn complete(op: &Op>, result: io::Result) { let cqe = CqeResult { result, flags: 0 }; - CONTEXT.with(|cx| cx.with_driver(|driver| driver.ops.complete(op.index, cqe))); + CONTEXT.with(|cx| cx.with_driver_mut(|driver| driver.ops.complete(op.index, cqe))); } fn release() { CONTEXT.with(|cx| { - cx.with_driver(|driver| driver.ops.lifecycle.clear()); + cx.with_driver_mut(|driver| driver.ops.lifecycle.clear()); cx.unset_driver(); }); diff --git a/src/runtime/context.rs b/src/runtime/context.rs index d34b6a77..df97b84b 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -44,7 +44,7 @@ impl RuntimeContext { } /// Execute a function which requires mutable access to the driver. - pub(crate) fn with_driver(&self, f: F) -> R + pub(crate) fn with_driver_mut(&self, f: F) -> R where F: FnOnce(&mut Driver) -> R, { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 9f8b012a..c1e5dd26 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -62,7 +62,7 @@ impl Runtime { let rt = tokio::runtime::Builder::new_current_thread() .on_thread_park(|| { CONTEXT.with(|x| { - let _ = x.with_driver(|d| d.uring.submit()); + let _ = x.with_driver_mut(|d| d.uring.submit()); }); }) .enable_all() @@ -96,7 +96,7 @@ impl Runtime { loop { // Wait for read-readiness let mut guard = driver.readable().await.unwrap(); - CONTEXT.with(|cx| cx.with_driver(|driver| driver.tick())); + CONTEXT.with(|cx| cx.with_driver_mut(|driver| driver.tick())); guard.clear_ready(); } }