diff --git a/.github/workflows/ci_check.yml b/.github/workflows/ci_check.yml index d7ef7eaf..202266f0 100644 --- a/.github/workflows/ci_check.yml +++ b/.github/workflows/ci_check.yml @@ -20,7 +20,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ ubuntu-latest, windows-latest, macos-latest ] + os: [ ubuntu-22.04, windows-latest, macos-latest ] steps: - uses: actions/checkout@v4 - name: Setup Rust Toolchain diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 71a8720c..125ec10d 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -216,7 +216,12 @@ impl Runtime { let res = std::panic::catch_unwind(AssertUnwindSafe(f)); BufResult(Ok(0), res) }); - unsafe { self.spawn_unchecked(self.submit(op).map(|res| res.1.into_inner())) } + // It is safe and sound to use `submit` here because the task is spawned + // immediately. + #[allow(deprecated)] + unsafe { + self.spawn_unchecked(self.submit(op).map(|res| res.1.into_inner())) + } } /// Attach a raw file descriptor/handle/socket to the runtime. @@ -234,7 +239,13 @@ impl Runtime { /// Submit an operation to the runtime. /// /// You only need this when authoring your own [`OpCode`]. + /// + /// It is safe to send the returned future to another runtime and poll it, + /// but the exact behavior is not guaranteed, e.g. it may return pending + /// forever or else. + #[deprecated = "use compio::runtime::submit instead"] pub fn submit(&self, op: T) -> impl Future> { + #[allow(deprecated)] self.submit_with_flags(op).map(|(res, _)| res) } @@ -244,6 +255,11 @@ impl Runtime { /// the flags /// /// You only need this when authoring your own [`OpCode`]. + /// + /// It is safe to send the returned future to another runtime and poll it, + /// but the exact behavior is not guaranteed, e.g. it may return pending + /// forever or else. + #[deprecated = "use compio::runtime::submit_with_flags instead"] pub fn submit_with_flags( &self, op: T, @@ -258,11 +274,6 @@ impl Runtime { } } - #[cfg(feature = "time")] - pub(crate) fn create_timer(&self, instant: std::time::Instant) -> impl Future { - TimerFuture::new(instant) - } - pub(crate) fn cancel_op(&self, op: Key) { self.driver.borrow_mut().cancel(op); } @@ -285,21 +296,6 @@ impl Runtime { }) } - #[cfg(feature = "time")] - pub(crate) fn register_timer( - &self, - cx: &mut Context, - instant: std::time::Instant, - ) -> Option { - let mut timer_runtime = self.timer_runtime.borrow_mut(); - if let Some(key) = timer_runtime.insert(instant) { - timer_runtime.update_waker(key, cx.waker().clone()); - Some(key) - } else { - None - } - } - #[cfg(feature = "time")] pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> { instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key); @@ -478,29 +474,37 @@ pub fn spawn_blocking( /// Submit an operation to the current runtime, and return a future for it. /// -/// It is safe but unspecified behavior to send the returned future to another -/// runtime and poll it. -/// /// ## Panics /// /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. -pub fn submit(op: T) -> impl Future> { - Runtime::with_current(|r| r.submit(op)) +pub async fn submit(op: T) -> BufResult { + submit_with_flags(op).await.0 } /// Submit an operation to the current runtime, and return a future for it with /// flags. /// -/// It is safe but unspecified behavior to send the returned future to another -/// runtime and poll it. -/// /// ## Panics /// /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. -pub fn submit_with_flags( - op: T, -) -> impl Future, u32)> { - Runtime::with_current(|r| r.submit_with_flags(op)) +pub async fn submit_with_flags(op: T) -> (BufResult, u32) { + let state = Runtime::with_current(|r| r.submit_raw(op)); + match state { + PushEntry::Pending(user_data) => OpFuture::new(user_data).await, + PushEntry::Ready(res) => { + // submit_flags won't be ready immediately, if ready, it must be error without + // flags, or the flags are not necessary + (res, 0) + } + } +} + +#[cfg(feature = "time")] +pub(crate) async fn create_timer(instant: std::time::Instant) { + let key = Runtime::with_current(|r| r.timer_runtime.borrow_mut().insert(instant)); + if let Some(key) = key { + TimerFuture::new(key).await + } } diff --git a/compio-runtime/src/runtime/time.rs b/compio-runtime/src/runtime/time.rs index 5776f869..c837957b 100644 --- a/compio-runtime/src/runtime/time.rs +++ b/compio-runtime/src/runtime/time.rs @@ -7,7 +7,6 @@ use std::{ time::{Duration, Instant}, }; -use futures_util::future::Either; use slab::Slab; use crate::runtime::Runtime; @@ -125,39 +124,26 @@ impl TimerRuntime { } pub struct TimerFuture { - key: Either, + key: usize, } impl TimerFuture { - pub fn new(instant: Instant) -> Self { - Self { - key: Either::Left(instant), - } + pub fn new(key: usize) -> Self { + Self { key } } } impl Future for TimerFuture { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Runtime::with_current(|r| match self.key { - Either::Left(instant) => match r.register_timer(cx, instant) { - Some(key) => { - self.key = Either::Right(key); - Poll::Pending - } - None => Poll::Ready(()), - }, - Either::Right(key) => r.poll_timer(cx, key), - }) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Runtime::with_current(|r| r.poll_timer(cx, self.key)) } } impl Drop for TimerFuture { fn drop(&mut self) { - if let Either::Right(key) = self.key { - Runtime::with_current(|r| r.cancel_timer(key)); - } + Runtime::with_current(|r| r.cancel_timer(self.key)); } } diff --git a/compio-runtime/src/time.rs b/compio-runtime/src/time.rs index 73b88778..7a99dc17 100644 --- a/compio-runtime/src/time.rs +++ b/compio-runtime/src/time.rs @@ -9,8 +9,6 @@ use std::{ use futures_util::{FutureExt, select}; -use crate::Runtime; - /// Waits until `duration` has elapsed. /// /// Equivalent to [`sleep_until(Instant::now() + duration)`](sleep_until). An @@ -55,7 +53,7 @@ pub async fn sleep(duration: Duration) { /// # }) /// ``` pub async fn sleep_until(deadline: Instant) { - Runtime::with_current(|r| r.create_timer(deadline)).await + crate::runtime::create_timer(deadline).await } /// Error returned by [`timeout`] or [`timeout_at`].