diff --git a/cmd/zfs_object_agent/util/src/lib.rs b/cmd/zfs_object_agent/util/src/lib.rs index cd192f481db5..f7305e7aca29 100644 --- a/cmd/zfs_object_agent/util/src/lib.rs +++ b/cmd/zfs_object_agent/util/src/lib.rs @@ -17,7 +17,6 @@ mod lock_set; mod logging; pub mod measure; pub mod message; -mod mutex_ext; mod nicenum; mod range_tree; pub mod serde; @@ -48,7 +47,6 @@ pub use logging::log; pub use logging::register_siguser1_to_dump_tracing; pub use logging::setup_logging; pub use logging::SUPER_EXPENSIVE_TRACE; -pub use mutex_ext::lock_non_send; pub use nicenum::nice_number_count; pub use nicenum::nice_number_time; pub use nicenum::nice_p2size; diff --git a/cmd/zfs_object_agent/util/src/measure/lock.rs b/cmd/zfs_object_agent/util/src/measure/lock.rs new file mode 100644 index 000000000000..4246a72454a3 --- /dev/null +++ b/cmd/zfs_object_agent/util/src/measure/lock.rs @@ -0,0 +1,79 @@ +use std::ops::Deref; +use std::ops::DerefMut; +use std::time::Instant; + +use super::Measurement; + +pub struct MeasuredMutexGuard<'a, T> { + inner: tokio::sync::MutexGuard<'a, T>, + begin: Instant, + hold: &'static Measurement, +} + +impl<'a, T> Drop for MeasuredMutexGuard<'a, T> { + fn drop(&mut self) { + self.hold.end_timed(self.begin); + } +} + +impl<'a, T> Deref for MeasuredMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'a, T> DerefMut for MeasuredMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +pub async fn lock<'a, T>( + mutex: &'a tokio::sync::Mutex, + acquire: &'static Measurement, + hold: &'static Measurement, +) -> MeasuredMutexGuard<'a, T> { + MeasuredMutexGuard { + inner: acquire.fut_timed(mutex.lock()).await, + hold, + begin: hold.begin_timed(), + } +} + +/// This locks the mutex like `Mutex::lock()`, but measures the time spent waiting for the lock, +/// and the time spent holding the lock. +#[macro_export] +macro_rules! lock_measured { + ($lock:expr, $tag:literal) => {{ + static ACQUIRE: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!( + "acquire lock ", + $tag, + " (", + file!(), + ":", + line!(), + ":", + column!(), + ")" + )); + ACQUIRE.register(); + static HOLD: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!( + "hold lock ", + $tag, + " (", + file!(), + ":", + line!(), + ":", + column!(), + ")" + )); + HOLD.register(); + + $crate::measure::lock::lock($lock, &ACQUIRE, &HOLD) + }}; + ($lock:expr) => { + $crate::lock_measured!($lock, "") + }; +} diff --git a/cmd/zfs_object_agent/util/src/measure/lock_non_send.rs b/cmd/zfs_object_agent/util/src/measure/lock_non_send.rs new file mode 100644 index 000000000000..788ef10e8c35 --- /dev/null +++ b/cmd/zfs_object_agent/util/src/measure/lock_non_send.rs @@ -0,0 +1,75 @@ +use std::marker::PhantomData; +use std::ops::Deref; +use std::ops::DerefMut; + +use super::lock::MeasuredMutexGuard; +use super::Measurement; + +pub struct NonSendMeasuredMutexGuard<'a, T> { + inner: MeasuredMutexGuard<'a, T>, + _marker: PhantomData<*const ()>, +} + +impl<'a, T> Deref for NonSendMeasuredMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'a, T> DerefMut for NonSendMeasuredMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +pub async fn lock_non_send<'a, T>( + mutex: &'a tokio::sync::Mutex, + acquire: &'static Measurement, + hold: &'static Measurement, +) -> NonSendMeasuredMutexGuard<'a, T> { + NonSendMeasuredMutexGuard { + inner: super::lock::lock(mutex, acquire, hold).await, + _marker: PhantomData, + } +} + +/// This locks the mutex like `Mutex::lock()`, but measures it (like `lock_measured!`), and +/// returns a new kind of guard which can not be sent between threads. This is useful if you +/// want to ensure that .await is not used while the mutex is locked by some callers, but .await +/// can be used from other callers (that use `lock_measured!` or `tokio::sync::Mutex::lock()` +/// directly). +#[macro_export] +macro_rules! lock_non_send_measured { + ($lock:expr, $tag:literal) => {{ + static ACQUIRE: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!( + "acquire lock non send", + $tag, + " (", + file!(), + ":", + line!(), + ":", + column!(), + ")" + )); + ACQUIRE.register(); + static HOLD: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!( + "hold lock non send", + $tag, + " (", + file!(), + ":", + line!(), + ":", + column!(), + ")" + )); + HOLD.register(); + + $crate::measure::lock_non_send::lock_non_send($lock, &ACQUIRE, &HOLD) + }}; + ($lock:expr) => { + $crate::lock_non_send_measured!($lock, "") + }; +} diff --git a/cmd/zfs_object_agent/util/src/measure.rs b/cmd/zfs_object_agent/util/src/measure/mod.rs similarity index 93% rename from cmd/zfs_object_agent/util/src/measure.rs rename to cmd/zfs_object_agent/util/src/measure/mod.rs index cdaf230bbe38..8661b93e2742 100644 --- a/cmd/zfs_object_agent/util/src/measure.rs +++ b/cmd/zfs_object_agent/util/src/measure/mod.rs @@ -1,3 +1,6 @@ +pub mod lock; +pub mod lock_non_send; + use core::fmt; use std::fmt::Display; use std::future::Future; @@ -49,6 +52,27 @@ impl Measurement { }); } + fn begin(&self) { + self.count.fetch_add(1, Ordering::Relaxed); + self.inflight.fetch_add(1, Ordering::Relaxed); + } + + fn end(&self) { + self.inflight.fetch_sub(1, Ordering::Relaxed); + } + + fn begin_timed(&self) -> Instant { + self.begin(); + Instant::now() + } + + fn end_timed(&self, begin: Instant) { + self.end(); + #[allow(clippy::cast_possible_truncation)] + let elapsed = begin.elapsed().as_nanos() as u64; + self.nanos.fetch_add(elapsed, Ordering::Relaxed); + } + /// Wrap the provided future in one that will measure its execution. // Lifetime annotations say that self must live longer than the `future` argument. This is // typically satisfied by `&'static self`, i.e. the static Measurement created by `measure!()`. @@ -66,12 +90,9 @@ impl Measurement { // times. self.fut_size.store(size_of_val(&future), Ordering::Relaxed); } - self.count.fetch_add(1, Ordering::Relaxed); - self.inflight.fetch_add(1, Ordering::Relaxed); + self.begin(); // We don't use an async function or closure because it doubles the size of the future. - future.inspect(move |_| { - self.inflight.fetch_sub(1, Ordering::Relaxed); - }) + future.inspect(|_| self.end()) } pub fn fut_timed<'a, 'b, R>( @@ -95,10 +116,9 @@ impl Measurement { where F: FnOnce() -> R, { - self.count.fetch_add(1, Ordering::Relaxed); - self.inflight.fetch_add(1, Ordering::Relaxed); + self.begin(); let result = f(); - self.inflight.fetch_sub(1, Ordering::Relaxed); + self.end(); result } @@ -107,11 +127,9 @@ impl Measurement { where F: FnOnce() -> R, { - let begin = Instant::now(); + let begin = self.begin_timed(); let result = self.func(f); - #[allow(clippy::cast_possible_truncation)] - let elapsed = begin.elapsed().as_nanos() as u64; - self.nanos.fetch_add(elapsed, Ordering::Relaxed); + self.end_timed(begin); result } diff --git a/cmd/zfs_object_agent/util/src/mutex_ext.rs b/cmd/zfs_object_agent/util/src/mutex_ext.rs deleted file mode 100644 index 78b4b610eb77..000000000000 --- a/cmd/zfs_object_agent/util/src/mutex_ext.rs +++ /dev/null @@ -1,36 +0,0 @@ -use std::marker::PhantomData; -use std::ops::Deref; -use std::ops::DerefMut; - -pub struct NonSendMutexGuard<'a, T> { - inner: tokio::sync::MutexGuard<'a, T>, - // force this to not be Send - _marker: PhantomData<*const ()>, -} - -impl<'a, T> Deref for NonSendMutexGuard<'a, T> { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl<'a, T> DerefMut for NonSendMutexGuard<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -// This locks the mutex like Mutex::lock(), but returns a new kind of guard -// which can not be sent between threads. This is useful if you want to ensure -// that .await is not used while the mutex is locked by some callers, but .await -// can be used from other callers (that use tokio::sync::Mutex::lock() -// directly). -pub async fn lock_non_send(mutex: &tokio::sync::Mutex) -> NonSendMutexGuard<'_, T> { - // It would be nice to do this via an async_trait, but that requires a memory - // allocation each time it's called, which can impact performance. - NonSendMutexGuard { - inner: mutex.lock().await, - _marker: PhantomData, - } -} diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs index 1a125b041767..e482ad0120ec 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs @@ -103,7 +103,9 @@ impl MergeMessage { let timer = Instant::now(); let free_count = frees.len(); let cache_updates_count = cache_updates.len(); - let (new_index, index_delta) = next_index.flush().await; + let (new_index, index_delta) = measure!("new_progress() next_index.flush()") + .fut_timed(next_index.flush()) + .await; let message = MergeProgress { new_index, index_delta, @@ -265,6 +267,8 @@ impl Progress { /// nothing to send. async fn report(&mut self) { if let Some(last_key) = self.last_key { + let entries_len = self.entries.len(); + let frees_len = self.frees.len(); measure!("Progress::report() tx.send(IndexMessage)") .fut_timed(self.tx.send(IndexMessage { last_key, @@ -280,9 +284,7 @@ impl Progress { .await .unwrap_or_else(|e| panic!("couldn't send: {e}")); trace!( - "Collected and sent {} entries and {} frees to next_index_task in {}ms", - self.entries.len(), - self.frees.len(), + "Collected and sent {entries_len} entries and {frees_len} frees to next_index_task after {}ms", self.timer.elapsed().as_millis() ); self.timer = Instant::now(); diff --git a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs index add876f312d7..284e8220212e 100644 --- a/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs +++ b/cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs @@ -44,7 +44,8 @@ use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use tokio::time::sleep_until; use util::concurrent_batch::ConcurrentBatch; -use util::lock_non_send; +use util::lock_measured; +use util::lock_non_send_measured; use util::measure; use util::message::ExpandDiskResponse; use util::nice_p2size; @@ -819,7 +820,7 @@ impl Inner { CACHE_INSERT_DEMAND_BUFFER_SIZE.as_u64() - cache.demand_buffer_bytes_available.available_permits() as u64, ); - measure!().fut(lock_non_send(&locked)).await.update_stats(); + lock_non_send_measured!(&locked).await.update_stats(); } }); @@ -881,7 +882,7 @@ impl Inner { loop { // if there is no current merging state, check to see if a merge should be started { - let mut locked = self.locked.lock().await; + let mut locked = lock_measured!(&self.locked).await; if locked.merge.is_none() { assert!(merging.is_none()); merging = locked.try_start_merge_task(self.old_index.clone()).await; @@ -931,7 +932,7 @@ impl Inner { ); { - let mut locked = self.locked.lock().await; + let mut locked = lock_non_send_measured!(&self.locked).await; let begin = Instant::now(); // free the extent ranges associated with the evicted blocks @@ -993,7 +994,7 @@ impl Inner { let mut old_index = self.old_index.write().await; let mut new_index_opt = self.new_index.write().await; - let mut locked = self.locked.lock().await; + let mut locked = lock_measured!(&self.locked).await; locked.rotate_index(&mut old_index, new_index).await; locked.block_allocator.rebalance_fini(); *new_index_opt = None; @@ -1052,7 +1053,10 @@ impl Inner { let begin = Instant::now(); // Bind to a variable here so that we can drop the state lock before waiting for the // batch of reads to complete. - let outstanding_reads = lock_non_send(&self.locked).await.outstanding_reads.rotate(); + let outstanding_reads = lock_non_send_measured!(&self.locked) + .await + .outstanding_reads + .rotate(); outstanding_reads.await; debug!( "waited for outstanding_reads in {}ms", @@ -1065,7 +1069,7 @@ impl Inner { // index/operation_log will actually have the correct contents. See above comments // on how the ConcurrentBatch is manipulated. let begin = Instant::now(); - let outstanding_writes = lock_non_send(&self.locked) + let outstanding_writes = lock_non_send_measured!(&self.locked) .await .outstanding_writes .rotate(); @@ -1078,7 +1082,7 @@ impl Inner { let (old_index_phys, delta) = self.old_index.write().await.flush().await; assert!(delta.is_empty()); - let mut locked = self.locked.lock().await; + let mut locked = lock_measured!(&self.locked).await; // Now that we have the state lock, we need to wait for outstanding i/os again, because // more i/os could have been initiated while we were waiting above. Those i/os will @@ -1183,7 +1187,7 @@ impl Inner { let fut_or_f = { // We don't want to hold the state lock while reading from disk so we use // lock_non_send() to ensure that we can't hold it across .await. - let mut locked = measure!().fut(lock_non_send(&self.locked)).await; + let mut locked = lock_non_send_measured!(&self.locked).await; let got_value = |locked: &mut Locked, f: F, counter, value| { if count_ghost_hits { @@ -1265,7 +1269,7 @@ impl Inner { Some(entry) => { // Again, we don't want to hold the state lock while reading from disk so we use // lock_non_send() to ensure that we can't hold it across .await. - let mut locked = measure!().fut(lock_non_send(&self.locked)).await; + let mut locked = lock_non_send_measured!(&self.locked).await; // The LockedKey prevents an entry for this key from being inserted while we // weren't holding the state lock. @@ -1282,7 +1286,7 @@ impl Inner { None => { // key not in index super_trace!("lookup {key:?}: cache miss after reading index"); - let mut locked = measure!().fut(lock_non_send(&self.locked)).await; + let mut locked = lock_non_send_measured!(&self.locked).await; f(&mut locked, None) } }; @@ -1344,11 +1348,10 @@ impl Inner { bytes }; let len = bytes.len() as u64; - let fut = measure!().fut(lock_non_send(&self.locked)).await.insert( - locked_key, - &self.pool_guids, - bytes, - ); + let fut = + lock_non_send_measured!(&self.locked) + .await + .insert(locked_key, &self.pool_guids, bytes); match measure!().fut(fut).await { Ok(_) => { self.stats.track_bytes(InsertBytes, len); @@ -1483,20 +1486,20 @@ impl Inner { } async fn add_disk(&self, path: &Path) -> Result<()> { - self.locked.lock().await.add_disk(path)?; + lock_measured!(&self.locked).await.add_disk(path)?; self.sync_checkpoint().await; Ok(()) } // Returns the amount of additional space, in bytes async fn expand_disk(&self, path: &Path) -> Result { - let additional_bytes = self.locked.lock().await.expand_disk(path)?; + let additional_bytes = lock_measured!(&self.locked).await.expand_disk(path)?; self.sync_checkpoint().await; Ok(additional_bytes) } async fn initiate_merge(&self) { - self.locked.lock().await.request_merge(); + lock_measured!(&self.locked).await.request_merge(); self.sync_checkpoint().await; } @@ -1519,11 +1522,11 @@ impl Inner { } async fn hits_by_size_data(&self) -> SizeHistogramPhys { - self.locked.lock().await.size_histogram.clone() + lock_measured!(&self.locked).await.size_histogram.clone() } async fn clear_hit_data(&self) { - self.locked.lock().await.clear_hit_data(); + lock_measured!(&self.locked).await.clear_hit_data(); } fn devices(&self) -> DeviceList {