From 686db4755913947219f83a04b21885032685d246 Mon Sep 17 00:00:00 2001 From: Bryan Hitchcock Date: Sun, 29 May 2022 15:26:12 -0400 Subject: [PATCH 1/5] Add Condvar::wait_while convenience methods --- src/condvar.rs | 249 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) diff --git a/src/condvar.rs b/src/condvar.rs index 46a5d448..bbe7f533 100644 --- a/src/condvar.rs +++ b/src/condvar.rs @@ -14,6 +14,7 @@ use core::{ }; use lock_api::RawMutex as RawMutex_; use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN}; +use std::ops::DerefMut; use std::time::{Duration, Instant}; /// A type indicating whether a timed wait on a condition variable returned @@ -29,6 +30,17 @@ impl WaitTimeoutResult { } } +/// A type indicating how many times a thread was blocked during wait_while. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub struct WaitWhileResult(u32); + +impl WaitWhileResult { + #[inline] + pub fn num_iters(self) -> u32 { + self.0 + } +} + /// A Condition Variable /// /// Condition variables represent the ability to block a thread such that it @@ -383,6 +395,134 @@ impl Condvar { let deadline = util::to_deadline(timeout); self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline) } + + #[inline] + fn wait_while_until_internal( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + mut condition: F, + timeout: Option, + ) -> (WaitWhileResult, WaitTimeoutResult) + where + T: ?Sized, + F: FnMut(&mut T) -> bool, + { + let mut result = WaitWhileResult(0); + let mut timeout_result = WaitTimeoutResult(false); + + while !timeout_result.timed_out() && condition(mutex_guard.deref_mut()) { + result.0 += 1; + timeout_result = + self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout); + } + + (result, timeout_result) + } + /// Blocks the current thread until this condition variable receives a + /// notification. If the provided condition evaluates to `false`, then the + /// thread is no longer blocked and the operation is completed. If the + /// condition evaluates to `true`, then the thread is blocked again and + /// waits for another notification before repeating this process. + /// + /// This function will atomically unlock the mutex specified (represented by + /// `mutex_guard`) and block the current thread. This means that any calls + /// to `notify_*()` which happen logically after the mutex is unlocked are + /// candidates to wake this thread up. When this function call returns, the + /// lock specified will have been re-acquired. + /// + /// # Panics + /// + /// This function will panic if another thread is waiting on the `Condvar` + /// with a different `Mutex` object. + #[inline] + pub fn wait_while( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + condition: F, + ) -> WaitWhileResult + where + T: ?Sized, + F: FnMut(&mut T) -> bool, + { + self.wait_while_until_internal(mutex_guard, condition, None) + .0 + } + + /// Waits on this condition variable for a notification, timing out after + /// the specified time instant. If the provided condition evaluates to + /// `false`, then the thread is no longer blocked and the operation is + /// completed. If the condition evaluates to `true`, then the thread is + /// blocked again and waits for another notification before repeating + /// this process. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked roughly until `timeout` is reached. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + /// + /// # Panics + /// + /// This function will panic if another thread is waiting on the `Condvar` + /// with a different `Mutex` object. + #[inline] + pub fn wait_while_until( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + condition: F, + timeout: Instant, + ) -> (WaitWhileResult, WaitTimeoutResult) + where + T: ?Sized, + F: FnMut(&mut T) -> bool, + { + self.wait_while_until_internal(mutex_guard, condition, Some(timeout)) + } + + /// Waits on this condition variable for a notification, timing out after a + /// specified duration. If the provided condition evaluates to `false`, + /// then the thread is no longer blocked and the operation is completed. + /// If the condition evaluates to `true`, then the thread is blocked again + /// and waits for another notification before repeating this process. + /// + /// The semantics of this function are equivalent to `wait()` except that + /// the thread will be blocked for roughly no longer than `timeout`. This + /// method should not be used for precise timing due to anomalies such as + /// preemption or platform differences that may not cause the maximum + /// amount of time waited to be precisely `timeout`. + /// + /// Note that the best effort is made to ensure that the time waited is + /// measured with a monotonic clock, and not affected by the changes made to + /// the system time. + /// + /// The returned `WaitTimeoutResult` value indicates if the timeout is + /// known to have elapsed. + /// + /// Like `wait`, the lock specified will be re-acquired when this function + /// returns, regardless of whether the timeout elapsed or not. + #[inline] + pub fn wait_while_for( + &self, + mutex_guard: &mut MutexGuard<'_, T>, + condition: F, + timeout: Duration, + ) -> (WaitWhileResult, WaitTimeoutResult) + where + F: FnMut(&mut T) -> bool, + { + let deadline = util::to_deadline(timeout); + self.wait_while_until_internal(mutex_guard, condition, deadline) + } } impl Default for Condvar { @@ -404,6 +544,8 @@ mod tests { use std::sync::mpsc::channel; use std::sync::Arc; use std::thread; + use std::thread::sleep; + use std::thread::JoinHandle; use std::time::Duration; use std::time::Instant; @@ -572,6 +714,113 @@ mod tests { drop(g); } + fn spawn_wait_while_notifier( + mutex: Arc>, + cv: Arc, + num_iters: u32, + timeout: Option, + ) -> JoinHandle<()> { + thread::spawn(move || { + for epoch in 1..=num_iters { + // spin to wait for main test thread to block + // before notifying it to wake back up and check + // its condition. + let mut sleep_backoff = Duration::from_millis(1); + let _mutex_guard = loop { + let mutex_guard = mutex.lock(); + + if let Some(timeout) = timeout { + if Instant::now() >= timeout { + return; + } + } + + if *mutex_guard == epoch { + break mutex_guard; + } + + drop(mutex_guard); + + // give main test thread a good chance to + // acquire the lock before this thread does. + sleep(sleep_backoff); + sleep_backoff *= 2; + }; + + cv.notify_one(); + } + }) + } + + #[test] + fn wait_while_until_internal_does_not_wait_if_initially_false() { + let mutex = Arc::new(Mutex::new(())); + let cv = Arc::new(Condvar::new()); + + let mut mutex_guard = mutex.lock(); + + let result = cv + .wait_while_until_internal(&mut mutex_guard, |_| false, None) + .0; + + assert!(result.num_iters() == 0); + } + + #[test] + fn wait_while_until_internal_times_out_before_false() { + let mutex = Arc::new(Mutex::new(0)); + let cv = Arc::new(Condvar::new()); + + let condition = |counter: &mut u32| { + *counter += 1; + true + }; + + let mut mutex_guard = mutex.lock(); + let timeout = Some(Instant::now() + Duration::from_millis(50)); + let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), u32::MAX, timeout); + + let (result, timeout_result) = + cv.wait_while_until_internal(&mut mutex_guard, condition, timeout); + + assert!(timeout_result.timed_out()); + assert!(result.num_iters() > 0); + assert!(result.num_iters() < u32::MAX); + + // prevent deadlock with notifier + drop(mutex_guard); + handle.join().unwrap(); + } + + #[test] + fn wait_while_until_internal() { + let mutex = Arc::new(Mutex::new(0)); + let cv = Arc::new(Condvar::new()); + + let num_iters = 4; + + let condition = |counter: &mut u32| { + *counter += 1; + *counter <= num_iters + }; + + let mut mutex_guard = mutex.lock(); + let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None); + + let (result, timeout_result) = + cv.wait_while_until_internal(&mut mutex_guard, condition, None); + + assert!(!timeout_result.timed_out()); + assert!(result.num_iters() == num_iters); + assert!(*mutex_guard == num_iters + 1); + + let result = cv.wait_while(&mut mutex_guard, condition); + handle.join().unwrap(); + + assert!(result.num_iters() == 0); + assert!(*mutex_guard == num_iters + 2); + } + #[test] #[should_panic] fn two_mutexes() { From 045828381a5facff71b1aad3c01af15801959458 Mon Sep 17 00:00:00 2001 From: Bryan Hitchcock Date: Sun, 29 May 2022 16:21:56 -0400 Subject: [PATCH 2/5] Use saturating_add for WaitWhileResult --- src/condvar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/condvar.rs b/src/condvar.rs index bbe7f533..5d0b893d 100644 --- a/src/condvar.rs +++ b/src/condvar.rs @@ -411,7 +411,7 @@ impl Condvar { let mut timeout_result = WaitTimeoutResult(false); while !timeout_result.timed_out() && condition(mutex_guard.deref_mut()) { - result.0 += 1; + result.0 = result.0.saturating_add(1); timeout_result = self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout); } From 26e19dced4ae01000abc0dfca251f56695b1b8aa Mon Sep 17 00:00:00 2001 From: Bryan Hitchcock Date: Mon, 30 May 2022 20:13:06 -0400 Subject: [PATCH 3/5] Remove WaitWhileResult --- src/condvar.rs | 65 ++++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/src/condvar.rs b/src/condvar.rs index 5d0b893d..069f2b53 100644 --- a/src/condvar.rs +++ b/src/condvar.rs @@ -30,17 +30,6 @@ impl WaitTimeoutResult { } } -/// A type indicating how many times a thread was blocked during wait_while. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub struct WaitWhileResult(u32); - -impl WaitWhileResult { - #[inline] - pub fn num_iters(self) -> u32 { - self.0 - } -} - /// A Condition Variable /// /// Condition variables represent the ability to block a thread such that it @@ -402,21 +391,19 @@ impl Condvar { mutex_guard: &mut MutexGuard<'_, T>, mut condition: F, timeout: Option, - ) -> (WaitWhileResult, WaitTimeoutResult) + ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool, { - let mut result = WaitWhileResult(0); - let mut timeout_result = WaitTimeoutResult(false); + let mut result = WaitTimeoutResult(false); - while !timeout_result.timed_out() && condition(mutex_guard.deref_mut()) { - result.0 = result.0.saturating_add(1); - timeout_result = + while !result.timed_out() && condition(mutex_guard.deref_mut()) { + result = self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout); } - (result, timeout_result) + result } /// Blocks the current thread until this condition variable receives a /// notification. If the provided condition evaluates to `false`, then the @@ -439,13 +426,12 @@ impl Condvar { &self, mutex_guard: &mut MutexGuard<'_, T>, condition: F, - ) -> WaitWhileResult + ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool, { self.wait_while_until_internal(mutex_guard, condition, None) - .0 } /// Waits on this condition variable for a notification, timing out after @@ -481,7 +467,7 @@ impl Condvar { mutex_guard: &mut MutexGuard<'_, T>, condition: F, timeout: Instant, - ) -> (WaitWhileResult, WaitTimeoutResult) + ) -> WaitTimeoutResult where T: ?Sized, F: FnMut(&mut T) -> bool, @@ -516,7 +502,7 @@ impl Condvar { mutex_guard: &mut MutexGuard<'_, T>, condition: F, timeout: Duration, - ) -> (WaitWhileResult, WaitTimeoutResult) + ) -> WaitTimeoutResult where F: FnMut(&mut T) -> bool, { @@ -754,16 +740,20 @@ mod tests { #[test] fn wait_while_until_internal_does_not_wait_if_initially_false() { - let mutex = Arc::new(Mutex::new(())); + let mutex = Arc::new(Mutex::new(0)); let cv = Arc::new(Condvar::new()); - let mut mutex_guard = mutex.lock(); + let condition = |counter: &mut u32| { + *counter += 1; + false + }; - let result = cv - .wait_while_until_internal(&mut mutex_guard, |_| false, None) - .0; + let mut mutex_guard = mutex.lock(); + let timeout_result = cv + .wait_while_until_internal(&mut mutex_guard, condition, None); - assert!(result.num_iters() == 0); + assert!(!timeout_result.timed_out()); + assert!(*mutex_guard == 1); } #[test] @@ -771,21 +761,23 @@ mod tests { let mutex = Arc::new(Mutex::new(0)); let cv = Arc::new(Condvar::new()); + let num_iters = u32::MAX; let condition = |counter: &mut u32| { *counter += 1; true }; let mut mutex_guard = mutex.lock(); - let timeout = Some(Instant::now() + Duration::from_millis(50)); - let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), u32::MAX, timeout); + let timeout = Some(Instant::now() + Duration::from_millis(500)); + let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout); - let (result, timeout_result) = + let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, timeout); assert!(timeout_result.timed_out()); - assert!(result.num_iters() > 0); - assert!(result.num_iters() < u32::MAX); + // thread should be blocked + woken up multiple times + assert!(*mutex_guard > 2); + assert!(*mutex_guard < num_iters); // prevent deadlock with notifier drop(mutex_guard); @@ -807,17 +799,16 @@ mod tests { let mut mutex_guard = mutex.lock(); let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None); - let (result, timeout_result) = + let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None); assert!(!timeout_result.timed_out()); - assert!(result.num_iters() == num_iters); assert!(*mutex_guard == num_iters + 1); - let result = cv.wait_while(&mut mutex_guard, condition); + let timeout_result = cv.wait_while(&mut mutex_guard, condition); handle.join().unwrap(); - assert!(result.num_iters() == 0); + assert!(!timeout_result.timed_out()); assert!(*mutex_guard == num_iters + 2); } From fdb063cd4e33d9c44cc08aea0c341ffec7e7b282 Mon Sep 17 00:00:00 2001 From: Bryan Hitchcock Date: Mon, 30 May 2022 20:21:13 -0400 Subject: [PATCH 4/5] wait_while can't timeout fix --- src/condvar.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/condvar.rs b/src/condvar.rs index 069f2b53..debeb081 100644 --- a/src/condvar.rs +++ b/src/condvar.rs @@ -422,16 +422,12 @@ impl Condvar { /// This function will panic if another thread is waiting on the `Condvar` /// with a different `Mutex` object. #[inline] - pub fn wait_while( - &self, - mutex_guard: &mut MutexGuard<'_, T>, - condition: F, - ) -> WaitTimeoutResult + pub fn wait_while(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F) where T: ?Sized, F: FnMut(&mut T) -> bool, { - self.wait_while_until_internal(mutex_guard, condition, None) + self.wait_while_until_internal(mutex_guard, condition, None); } /// Waits on this condition variable for a notification, timing out after @@ -805,7 +801,7 @@ mod tests { assert!(!timeout_result.timed_out()); assert!(*mutex_guard == num_iters + 1); - let timeout_result = cv.wait_while(&mut mutex_guard, condition); + let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None); handle.join().unwrap(); assert!(!timeout_result.timed_out()); From ef12b00daf0dbc6dd025098ec2cd0517bb9f737c Mon Sep 17 00:00:00 2001 From: Bryan Hitchcock Date: Mon, 30 May 2022 20:24:09 -0400 Subject: [PATCH 5/5] small test update --- src/condvar.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/condvar.rs b/src/condvar.rs index debeb081..15daa694 100644 --- a/src/condvar.rs +++ b/src/condvar.rs @@ -757,7 +757,7 @@ mod tests { let mutex = Arc::new(Mutex::new(0)); let cv = Arc::new(Condvar::new()); - let num_iters = u32::MAX; + let num_iters = 3; let condition = |counter: &mut u32| { *counter += 1; true @@ -771,9 +771,7 @@ mod tests { cv.wait_while_until_internal(&mut mutex_guard, condition, timeout); assert!(timeout_result.timed_out()); - // thread should be blocked + woken up multiple times - assert!(*mutex_guard > 2); - assert!(*mutex_guard < num_iters); + assert!(*mutex_guard == num_iters + 1); // prevent deadlock with notifier drop(mutex_guard);