From 163d8417e1bab1e17e2beaa45a7575e76eb09202 Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Mon, 13 May 2024 18:33:50 +0200 Subject: [PATCH 1/7] task: Drop the join waker of a task eagerly when the JoinHandle gets dropped or the task completes Currently, the waker registered with a JoinHandle is not dropped until the task allocation is dropped. This behaviour may cause the memory allocated by a task to not be freed when in the case of two tasks awaiting each others JoinHandle. This commit changes the behaviour by actively dropping the waker when the JoinHandle gets dropped (or the task completes in some cases). --- spellcheck.dic | 1 + tokio/src/runtime/task/harness.rs | 48 ++++++++++++++++++-- tokio/src/runtime/task/mod.rs | 5 ++ tokio/src/runtime/task/state.rs | 45 ++++++++++++------ tokio/src/runtime/tests/loom_multi_thread.rs | 30 ++++++++++++ 5 files changed, 112 insertions(+), 17 deletions(-) diff --git a/spellcheck.dic b/spellcheck.dic index f368d2d7214..4b6dbd910f8 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -274,6 +274,7 @@ unparks Unparks unreceived unsafety +unsets Unsets unsynchronized untrusted diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 996f0f2d9b4..bf74d48dada 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -284,9 +284,11 @@ where } pub(super) fn drop_join_handle_slow(self) { - // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in // case the task concurrently completed. - if self.state().unset_join_interested().is_err() { + let snapshot = self.state().transition_to_join_handle_dropped(); + + if snapshot.is_complete() { // It is our responsibility to drop the output. This is critical as // the task output may not be `Send` and as such must remain with // the scheduler or `JoinHandle`. i.e. if the output remains in the @@ -301,6 +303,25 @@ where })); } + if !snapshot.is_join_waker_set() { + // If the JOIN_WAKER flag is unset at this point, the task is either + // already terminal or not complete so the `JoinHandle` is responsible + // for dropping the waker. + // Safety: + // If the JOIN_WAKER bit is not set the join handle has exclusive + // access to the waker as per rule 2 in task/mod.rs. + // This can only be the case at this point in two scenarios: + // 1. The task completed and the runtime unset `JOIN_WAKER` flag + // after accessing the waker during task completion. So the + // `JoinHandle` is the only one to access the join waker here. + // 2. The task is not completed so the `JoinHandle` was able to unset + // `JOIN_WAKER` bit itself to get mutable access to the waker. + // The runtime will not access the waker when this flag is unset. + unsafe { + self.trailer().set_waker(None); + } + } + // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); } @@ -311,7 +332,6 @@ where fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. - let snapshot = self.state().transition_to_complete(); // We catch panics here in case dropping the future or waking the @@ -320,13 +340,33 @@ where if !snapshot.is_join_interested() { // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the - // output. + // output. The join waker was already dropped by the + // `JoinHandle` before. self.core().drop_future_or_output(); } else if snapshot.is_join_waker_set() { // Notify the waker. Reading the waker field is safe per rule 4 // in task/mod.rs, since the JOIN_WAKER bit is set and the call // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); + + // If JOIN_INTEREST is still set at this point the `JoinHandle` + // was not dropped since setting COMPLETE so we unset JOIN_WAKER + // to give the responsibility of dropping the join waker back to + // the `JoinHandle`. `JoinHandle` is able to drop the waker when + // itself gets dropped. + if self.state().unset_waker_if_join_interested().is_err() { + // Unsetting JOIN_WAKER flag will fail if JOIN_INTERESTED is + // not set to indicate that the runtime has the responsibility + // to drop the join waker here as per rule 7 in task/mod.rs. + // Safety: + // If JOIN_INTEREST got unset since setting COMPLETE we are + // the only ones to have access to the join waker and need + // to drop it here because the `JoinHandle` of the task + // already got dropped. + unsafe { + self.trailer().set_waker(None); + } + } } })); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 33f54003d38..cfcf6f7d49e 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -94,10 +94,15 @@ //! `JoinHandle` needs to (i) successfully set `JOIN_WAKER` to zero if it is //! not already zero to gain exclusive access to the waker field per rule //! 2, (ii) write a waker, and (iii) successfully set `JOIN_WAKER` to one. +//! If the `JoinHandle` unsets `JOIN_WAKER` in the process of being dropped +//! to clear the waker field, only steps (i) and (ii) are relevant. //! //! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e. //! the task hasn't yet completed). //! +//! 7. If `JOIN_INTEREST` is zero and COMPLETE is one, then the runtime has +//! exclusive (mutable) access to the waker field. +//! //! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a //! race. If step (i) fails, then the attempt to write a waker is aborted. If //! step (iii) fails because COMPLETE is set to one by another thread after diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 0fc7bb0329b..35db3e8e453 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -371,22 +371,21 @@ impl State { .map_err(|_| ()) } - /// Tries to unset the `JOIN_INTEREST` flag. - /// - /// Returns `Ok` if the operation happens before the task transitions to a - /// completed state, `Err` otherwise. - pub(super) fn unset_join_interested(&self) -> UpdateResult { - self.fetch_update(|curr| { - assert!(curr.is_join_interested()); + /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER` + /// flag is also unset. + pub(super) fn transition_to_join_handle_dropped(&self) -> Snapshot { + self.fetch_update_action(|mut snapshot| { + assert!(snapshot.is_join_interested()); - if curr.is_complete() { - return None; - } + snapshot.unset_join_interested(); - let mut next = curr; - next.unset_join_interested(); + if !snapshot.is_complete() { + // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the + // `JoinHandle` exclusive access to the waker to drop it. + snapshot.unset_join_waker(); + } - Some(next) + (snapshot, Some(snapshot)) }) } @@ -430,6 +429,26 @@ impl State { }) } + /// Unsets the `JOIN_WAKER` bit only if the `JOIN_INTEREST` is still set. + /// + /// Returns `Ok` has been unset, `Err` otherwise. This operation requires + /// the task to be completed. + pub(super) fn unset_waker_if_join_interested(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_complete()); + assert!(curr.is_join_waker_set()); + + if !curr.is_join_interested() { + return None; + } + + let mut next = curr; + next.unset_join_waker(); + + Some(next) + }) + } + pub(super) fn ref_inc(&self) { use std::process; use std::sync::atomic::Ordering::Relaxed; diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index ddd14b7fb3f..e2706e65c65 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -10,6 +10,7 @@ mod yield_now; /// In order to speed up the C use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; +use crate::sync::mpsc::channel; use crate::{spawn, task}; use tokio_test::assert_ok; @@ -459,3 +460,32 @@ impl Future for Track { }) } } + +#[test] +fn drop_tasks_with_reference_cycle() { + loom::model(|| { + let pool = mk_pool(2); + + pool.block_on(async move { + let (tx, mut rx) = channel(1); + + let (a_closer, mut wait_for_close_a) = channel::<()>(1); + let (b_closer, mut wait_for_close_b) = channel::<()>(1); + + let a = spawn(async move { + let b = rx.recv().await.unwrap(); + + futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await; + }); + + let b = spawn(async move { + let _ = a.await; + let _ = b_closer.send(()).await; + }); + + tx.send(b).await.unwrap(); + + futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await; + }); + }); +} From d457bbb735c31c676c0cc953a61728d949016e8d Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Mon, 25 Nov 2024 12:18:07 +0100 Subject: [PATCH 2/7] Updated documentaion of the JOIN_WAKER bit flag following the review recommendations --- tokio/src/runtime/task/mod.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index cfcf6f7d49e..15c5a8f4afe 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -98,17 +98,26 @@ //! to clear the waker field, only steps (i) and (ii) are relevant. //! //! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e. -//! the task hasn't yet completed). +//! the task hasn't yet completed). The runtime can change `JOIN_WAKER` only +//! if COMPLETE is one. //! //! 7. If `JOIN_INTEREST` is zero and COMPLETE is one, then the runtime has -//! exclusive (mutable) access to the waker field. +//! exclusive (mutable) access to the waker field. This might happen if the +//! `JoinHandle` gets dropped right after the task completes and the runtime +//! sets the `COMPLETE` bit. In this case the runtime needs the mutable access +//! to the waker field to drop it. //! //! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a //! race. If step (i) fails, then the attempt to write a waker is aborted. If //! step (iii) fails because COMPLETE is set to one by another thread after //! step (i), then the waker field is cleared. Once COMPLETE is one (i.e. //! task has completed), the `JoinHandle` will not modify `JOIN_WAKER`. After the -//! runtime sets COMPLETE to one, it invokes the waker if there is one. +//! runtime sets COMPLETE to one, it invokes the waker if there is one so in this +//! case when a task completes the `JOIN_WAKER` bit implicates to the runtime +//! whether it should invoke the waker or not. After the runtime is done with +//! using the waker during task completion, it unsets the `JOIN_WAKER` bit to give +//! the `JoinHandle` exclusive access again so that it is able to drop the waker +//! at a later point. //! //! All other fields are immutable and can be accessed immutably without //! synchronization by anyone. From ef31f18b572632759d233690bcb7eb726d1acd2d Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Wed, 4 Dec 2024 22:05:03 +0100 Subject: [PATCH 3/7] Review recommendations: * `State::transition_do_join_handle_drop` now returns a struct with to booleans to indicate what the JoinHandle should drop * Use `fetch_and` when unsetting JOIN_WAKER after COMPLETE --- tokio/src/runtime/task/harness.rs | 39 +++++++++------------- tokio/src/runtime/task/state.rs | 55 ++++++++++++++++++++----------- 2 files changed, 51 insertions(+), 43 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index bf74d48dada..c8b780bec29 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -286,9 +286,9 @@ where pub(super) fn drop_join_handle_slow(self) { // Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in // case the task concurrently completed. - let snapshot = self.state().transition_to_join_handle_dropped(); + let transition = self.state().transition_to_join_handle_dropped(); - if snapshot.is_complete() { + if transition.drop_output { // It is our responsibility to drop the output. This is critical as // the task output may not be `Send` and as such must remain with // the scheduler or `JoinHandle`. i.e. if the output remains in the @@ -303,7 +303,7 @@ where })); } - if !snapshot.is_join_waker_set() { + if transition.drop_waker { // If the JOIN_WAKER flag is unset at this point, the task is either // already terminal or not complete so the `JoinHandle` is responsible // for dropping the waker. @@ -317,9 +317,7 @@ where // 2. The task is not completed so the `JoinHandle` was able to unset // `JOIN_WAKER` bit itself to get mutable access to the waker. // The runtime will not access the waker when this flag is unset. - unsafe { - self.trailer().set_waker(None); - } + unsafe { self.trailer().set_waker(None) } } // Drop the `JoinHandle` reference, possibly deallocating the task @@ -349,23 +347,18 @@ where // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); - // If JOIN_INTEREST is still set at this point the `JoinHandle` - // was not dropped since setting COMPLETE so we unset JOIN_WAKER - // to give the responsibility of dropping the join waker back to - // the `JoinHandle`. `JoinHandle` is able to drop the waker when - // itself gets dropped. - if self.state().unset_waker_if_join_interested().is_err() { - // Unsetting JOIN_WAKER flag will fail if JOIN_INTERESTED is - // not set to indicate that the runtime has the responsibility - // to drop the join waker here as per rule 7 in task/mod.rs. - // Safety: - // If JOIN_INTEREST got unset since setting COMPLETE we are - // the only ones to have access to the join waker and need - // to drop it here because the `JoinHandle` of the task - // already got dropped. - unsafe { - self.trailer().set_waker(None); - } + // Inform the `JoinHandle` that we are done waking the waker by + // unsetting the `JOIN_WAKER` bit. If the `JoinHandle` has + // already been dropped and `JOIN_INTEREST` is unset, then we must + // drop the waker ourselves. + if !self + .state() + .unset_waker_after_complete() + .is_join_interested() + { + // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so + // we have exclusive access to the waker. + unsafe { self.trailer().set_waker(None) } } } })); diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 35db3e8e453..6ac8de15b8b 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -89,6 +89,12 @@ pub(crate) enum TransitionToNotifiedByRef { Submit, } +#[must_use] +pub(super) struct TransitionToJoinHandleDrop { + pub(super) drop_waker: bool, + pub(super) drop_output: bool, +} + /// All transitions are performed via RMW operations. This establishes an /// unambiguous modification order. impl State { @@ -373,19 +379,38 @@ impl State { /// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER` /// flag is also unset. - pub(super) fn transition_to_join_handle_dropped(&self) -> Snapshot { + /// The returned `TransitionToJoinHandleDrop` indicates whether the `JoinHandle` should drop + /// the output of the future or the join waker after the transition. + pub(super) fn transition_to_join_handle_dropped(&self) -> TransitionToJoinHandleDrop { self.fetch_update_action(|mut snapshot| { assert!(snapshot.is_join_interested()); + let mut transition = TransitionToJoinHandleDrop { + drop_waker: false, + drop_output: false, + }; + snapshot.unset_join_interested(); if !snapshot.is_complete() { // If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the - // `JoinHandle` exclusive access to the waker to drop it. + // `JoinHandle` exclusive access to the waker following rule 6 in task/mod.rs. + // The `JoinHandle` will drop the waker if it has exclusive access + // to drop it. snapshot.unset_join_waker(); + } else { + // If `COMPLETE` is set the task is completed so the `JoinHandle` is responsible + // for dropping the output. + transition.drop_output = true; + } + + if !snapshot.is_join_waker_set() { + // If the `JOIN_WAKER` bit is unset and the `JOIN_HANDLE` has exclusive access to + // the the join waker and should drop it following this transition. + transition.drop_waker = true; } - (snapshot, Some(snapshot)) + (transition, Some(snapshot)) }) } @@ -429,24 +454,14 @@ impl State { }) } - /// Unsets the `JOIN_WAKER` bit only if the `JOIN_INTEREST` is still set. + /// Unsets the `JOIN_WAKER` bit unconditionally after task completion. /// - /// Returns `Ok` has been unset, `Err` otherwise. This operation requires - /// the task to be completed. - pub(super) fn unset_waker_if_join_interested(&self) -> UpdateResult { - self.fetch_update(|curr| { - assert!(curr.is_complete()); - assert!(curr.is_join_waker_set()); - - if !curr.is_join_interested() { - return None; - } - - let mut next = curr; - next.unset_join_waker(); - - Some(next) - }) + /// This operation requires the task to be completed. + pub(super) fn unset_waker_after_complete(&self) -> Snapshot { + let prev = Snapshot(self.val.fetch_and(!JOIN_WAKER, AcqRel)); + assert!(prev.is_complete()); + assert!(prev.is_join_waker_set()); + Snapshot(prev.0 & !JOIN_WAKER) } pub(super) fn ref_inc(&self) { From cea78d001a4bfaf51bd4367c14765abf38b93590 Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Thu, 5 Dec 2024 17:20:05 +0100 Subject: [PATCH 4/7] Review recommendations: * Added missing ';' after unsafe blocks * Improved documentation in `State::transition_to_join_handle_dropped` on the scenarios in which `JOIN_WAKER` gets unset --- tokio/src/runtime/task/harness.rs | 4 ++-- tokio/src/runtime/task/state.rs | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index c8b780bec29..9bf73b74fbf 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -317,7 +317,7 @@ where // 2. The task is not completed so the `JoinHandle` was able to unset // `JOIN_WAKER` bit itself to get mutable access to the waker. // The runtime will not access the waker when this flag is unset. - unsafe { self.trailer().set_waker(None) } + unsafe { self.trailer().set_waker(None) }; } // Drop the `JoinHandle` reference, possibly deallocating the task @@ -358,7 +358,7 @@ where { // SAFETY: We have COMPLETE=1 and JOIN_INTEREST=0, so // we have exclusive access to the waker. - unsafe { self.trailer().set_waker(None) } + unsafe { self.trailer().set_waker(None) }; } } })); diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 6ac8de15b8b..037c1c90c61 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -406,7 +406,12 @@ impl State { if !snapshot.is_join_waker_set() { // If the `JOIN_WAKER` bit is unset and the `JOIN_HANDLE` has exclusive access to - // the the join waker and should drop it following this transition. + // the join waker and should drop it following this transition. + // This might happen in two situations: + // 1. The task is not completed and we just unset the `JOIN_WAKer` above in this + // function. + // 2. The task is completed. In that case the `JOIN_WAKER` bit was already unset + // by the runtime during completion. transition.drop_waker = true; } From 4733c9f33da8913dc2ac2433d7ef97c21492a9e2 Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Sat, 21 Dec 2024 10:30:08 +0100 Subject: [PATCH 5/7] Added new test to drop join handles druing schedule and moved existing ref cycle test into a non-loom test (leaks will be detected by miri) --- .../src/runtime/tests/loom_current_thread.rs | 58 ++++++++++++++++++- tokio/src/runtime/tests/loom_multi_thread.rs | 30 ---------- tokio/src/runtime/tests/task.rs | 39 ++++++++++++- 3 files changed, 94 insertions(+), 33 deletions(-) diff --git a/tokio/src/runtime/tests/loom_current_thread.rs b/tokio/src/runtime/tests/loom_current_thread.rs index edda6e49954..fd0a44314f8 100644 --- a/tokio/src/runtime/tests/loom_current_thread.rs +++ b/tokio/src/runtime/tests/loom_current_thread.rs @@ -1,6 +1,6 @@ mod yield_now; -use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::{AtomicUsize, Ordering}; use crate::loom::sync::Arc; use crate::loom::thread; use crate::runtime::{Builder, Runtime}; @@ -9,7 +9,7 @@ use crate::task; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering::{Acquire, Release}; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; fn assert_at_most_num_polls(rt: Arc, at_most_polls: usize) { let (tx, rx) = oneshot::channel(); @@ -106,6 +106,60 @@ fn assert_no_unnecessary_polls() { }); } +#[test] +fn drop_jh_during_schedule() { + unsafe fn waker_clone(ptr: *const ()) -> RawWaker { + let atomic = unsafe { &*(ptr as *const AtomicUsize) }; + atomic.fetch_add(1, Ordering::Relaxed); + RawWaker::new(ptr, &VTABLE) + } + unsafe fn waker_drop(ptr: *const ()) { + let atomic = unsafe { &*(ptr as *const AtomicUsize) }; + atomic.fetch_sub(1, Ordering::Relaxed); + } + unsafe fn waker_nop(_ptr: *const ()) {} + + static VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_drop, waker_nop, waker_drop); + + loom::model(|| { + let rt = Builder::new_current_thread().build().unwrap(); + + let mut jh = rt.spawn(async {}); + // Using AbortHandle to increment task refcount. This ensures that the waker is not + // destroyed due to the refcount hitting zero. + let task_refcnt = jh.abort_handle(); + + let waker_refcnt = AtomicUsize::new(1); + { + // Set up the join waker. + use std::future::Future; + use std::pin::Pin; + + // SAFETY: Before `waker_refcnt` goes out of scope, this test asserts that the refcnt + // has dropped to zero. + let join_waker = unsafe { + Waker::from_raw(RawWaker::new( + (&waker_refcnt) as *const AtomicUsize as *const (), + &VTABLE, + )) + }; + + assert!(Pin::new(&mut jh) + .poll(&mut Context::from_waker(&join_waker)) + .is_pending()); + } + assert_eq!(waker_refcnt.load(Ordering::Relaxed), 1); + + let bg_thread = loom::thread::spawn(move || drop(jh)); + rt.block_on(crate::task::yield_now()); + bg_thread.join().unwrap(); + + assert_eq!(waker_refcnt.load(Ordering::Relaxed), 0); + drop(task_refcnt); + }); +} + struct BlockedFuture { rx: Receiver<()>, num_polls: Arc, diff --git a/tokio/src/runtime/tests/loom_multi_thread.rs b/tokio/src/runtime/tests/loom_multi_thread.rs index e2706e65c65..ddd14b7fb3f 100644 --- a/tokio/src/runtime/tests/loom_multi_thread.rs +++ b/tokio/src/runtime/tests/loom_multi_thread.rs @@ -10,7 +10,6 @@ mod yield_now; /// In order to speed up the C use crate::runtime::tests::loom_oneshot as oneshot; use crate::runtime::{self, Runtime}; -use crate::sync::mpsc::channel; use crate::{spawn, task}; use tokio_test::assert_ok; @@ -460,32 +459,3 @@ impl Future for Track { }) } } - -#[test] -fn drop_tasks_with_reference_cycle() { - loom::model(|| { - let pool = mk_pool(2); - - pool.block_on(async move { - let (tx, mut rx) = channel(1); - - let (a_closer, mut wait_for_close_a) = channel::<()>(1); - let (b_closer, mut wait_for_close_b) = channel::<()>(1); - - let a = spawn(async move { - let b = rx.recv().await.unwrap(); - - futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await; - }); - - let b = spawn(async move { - let _ = a.await; - let _ = b_closer.send(()).await; - }); - - tx.send(b).await.unwrap(); - - futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await; - }); - }); -} diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index ea48b8e5199..66d4b8c2773 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,7 +1,9 @@ use crate::runtime::task::{ self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks, }; -use crate::runtime::tests::NoopSchedule; +use crate::runtime::{self, tests::NoopSchedule}; +use crate::spawn; +use crate::sync::{mpsc, Barrier}; use std::collections::VecDeque; use std::future::Future; @@ -45,6 +47,41 @@ impl Drop for AssertDrop { } } +#[test] +fn drop_tasks_with_reference_cycle() { + let rt = runtime::Builder::new_current_thread().build().unwrap(); + + rt.block_on(async { + let (tx, mut rx) = mpsc::channel(1); + + let barrier = Arc::new(Barrier::new(3)); + let barrier_a = barrier.clone(); + let barrier_b = barrier.clone(); + + let a = spawn(async move { + let b = rx.recv().await.unwrap(); + + // Poll the JoinHandle once. This registers the waker. + // The other task cannot have finished at this point due to the barrier below. + futures::future::select(b, std::future::ready(())).await; + + barrier_a.wait().await; + }); + + let b = spawn(async move { + // Poll the JoinHandle once. This registers the waker. + // The other task cannot have finished at this point due to the barrier below. + futures::future::select(a, std::future::ready(())).await; + + barrier_b.wait().await; + }); + + tx.send(b).await.unwrap(); + + barrier.wait().await; + }); +} + // A Notified does not shut down on drop, but it is dropped once the ref-count // hits zero. #[test] From 6700e6601245cf0c2754ceb7e25a478a88dee9a5 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 29 Dec 2024 16:52:10 +0100 Subject: [PATCH 6/7] Move test to tests/rt_handle.rs --- tokio/src/runtime/tests/task.rs | 39 +-------------------------------- tokio/tests/rt_handle.rs | 36 ++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 66d4b8c2773..ea48b8e5199 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,9 +1,7 @@ use crate::runtime::task::{ self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks, }; -use crate::runtime::{self, tests::NoopSchedule}; -use crate::spawn; -use crate::sync::{mpsc, Barrier}; +use crate::runtime::tests::NoopSchedule; use std::collections::VecDeque; use std::future::Future; @@ -47,41 +45,6 @@ impl Drop for AssertDrop { } } -#[test] -fn drop_tasks_with_reference_cycle() { - let rt = runtime::Builder::new_current_thread().build().unwrap(); - - rt.block_on(async { - let (tx, mut rx) = mpsc::channel(1); - - let barrier = Arc::new(Barrier::new(3)); - let barrier_a = barrier.clone(); - let barrier_b = barrier.clone(); - - let a = spawn(async move { - let b = rx.recv().await.unwrap(); - - // Poll the JoinHandle once. This registers the waker. - // The other task cannot have finished at this point due to the barrier below. - futures::future::select(b, std::future::ready(())).await; - - barrier_a.wait().await; - }); - - let b = spawn(async move { - // Poll the JoinHandle once. This registers the waker. - // The other task cannot have finished at this point due to the barrier below. - futures::future::select(a, std::future::ready(())).await; - - barrier_b.wait().await; - }); - - tx.send(b).await.unwrap(); - - barrier.wait().await; - }); -} - // A Notified does not shut down on drop, but it is dropped once the ref-count // hits zero. #[test] diff --git a/tokio/tests/rt_handle.rs b/tokio/tests/rt_handle.rs index 9efe9b4bde9..1c5c96b6e1e 100644 --- a/tokio/tests/rt_handle.rs +++ b/tokio/tests/rt_handle.rs @@ -3,6 +3,8 @@ #![cfg(feature = "full")] use tokio::runtime::Runtime; +use tokio::sync::{mpsc, Barrier}; +use std::sync::Arc; #[test] #[cfg_attr(panic = "abort", ignore)] @@ -65,6 +67,40 @@ fn interleave_then_enter() { let _enter = rt3.enter(); } +// If the cycle causes a leak, then miri will catch it. +#[test] +fn drop_tasks_with_reference_cycle() { + rt().block_on(async { + let (tx, mut rx) = mpsc::channel(1); + + let barrier = Arc::new(Barrier::new(3)); + let barrier_a = barrier.clone(); + let barrier_b = barrier.clone(); + + let a = tokio::spawn(async move { + let b = rx.recv().await.unwrap(); + + // Poll the JoinHandle once. This registers the waker. + // The other task cannot have finished at this point due to the barrier below. + futures::future::select(b, std::future::ready(())).await; + + barrier_a.wait().await; + }); + + let b = tokio::spawn(async move { + // Poll the JoinHandle once. This registers the waker. + // The other task cannot have finished at this point due to the barrier below. + futures::future::select(a, std::future::ready(())).await; + + barrier_b.wait().await; + }); + + tx.send(b).await.unwrap(); + + barrier.wait().await; + }); +} + #[cfg(tokio_unstable)] mod unstable { use super::*; From 3e36c6079a48795b3944b7993f65d0463e237402 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 29 Dec 2024 16:59:09 +0100 Subject: [PATCH 7/7] fmt --- tokio/tests/rt_handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/rt_handle.rs b/tokio/tests/rt_handle.rs index 1c5c96b6e1e..3773b8972af 100644 --- a/tokio/tests/rt_handle.rs +++ b/tokio/tests/rt_handle.rs @@ -2,9 +2,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::sync::Arc; use tokio::runtime::Runtime; use tokio::sync::{mpsc, Barrier}; -use std::sync::Arc; #[test] #[cfg_attr(panic = "abort", ignore)]