Skip to content

Commit

Permalink
measure zettacache::Locked acquisition and hold times (openzfs#478)
Browse files Browse the repository at this point in the history
Add 2 new macros to the `measure` infrastructure:
`lock_measured!(&tokio::sync::Mutex)` and
`lock_non_send_measured!(&tokio::sync::Mutex)`.  These macros measure
the lock acquision time, count, and number of in-flight waiters; as well
as the lock hold time and number of in-flight holders (which is at most
one).

Use this infrastructure to measure the `zettacache::Locked` lock for all
acquirers.

Remove the now-unused `lock_non_send()` function in favor of
`lock_non_send_measured!()`.
  • Loading branch information
ahrens authored Jun 12, 2022
1 parent e234ba7 commit 7a5dae7
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 75 deletions.
2 changes: 0 additions & 2 deletions cmd/zfs_object_agent/util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ mod lock_set;
mod logging;
pub mod measure;
pub mod message;
mod mutex_ext;
mod nicenum;
mod range_tree;
pub mod serde;
Expand Down Expand Up @@ -48,7 +47,6 @@ pub use logging::log;
pub use logging::register_siguser1_to_dump_tracing;
pub use logging::setup_logging;
pub use logging::SUPER_EXPENSIVE_TRACE;
pub use mutex_ext::lock_non_send;
pub use nicenum::nice_number_count;
pub use nicenum::nice_number_time;
pub use nicenum::nice_p2size;
Expand Down
79 changes: 79 additions & 0 deletions cmd/zfs_object_agent/util/src/measure/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::ops::Deref;
use std::ops::DerefMut;
use std::time::Instant;

use super::Measurement;

pub struct MeasuredMutexGuard<'a, T> {
inner: tokio::sync::MutexGuard<'a, T>,
begin: Instant,
hold: &'static Measurement,
}

impl<'a, T> Drop for MeasuredMutexGuard<'a, T> {
fn drop(&mut self) {
self.hold.end_timed(self.begin);
}
}

impl<'a, T> Deref for MeasuredMutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<'a, T> DerefMut for MeasuredMutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

pub async fn lock<'a, T>(
mutex: &'a tokio::sync::Mutex<T>,
acquire: &'static Measurement,
hold: &'static Measurement,
) -> MeasuredMutexGuard<'a, T> {
MeasuredMutexGuard {
inner: acquire.fut_timed(mutex.lock()).await,
hold,
begin: hold.begin_timed(),
}
}

/// This locks the mutex like `Mutex::lock()`, but measures the time spent waiting for the lock,
/// and the time spent holding the lock.
#[macro_export]
macro_rules! lock_measured {
($lock:expr, $tag:literal) => {{
static ACQUIRE: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!(
"acquire lock ",
$tag,
" (",
file!(),
":",
line!(),
":",
column!(),
")"
));
ACQUIRE.register();
static HOLD: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!(
"hold lock ",
$tag,
" (",
file!(),
":",
line!(),
":",
column!(),
")"
));
HOLD.register();

$crate::measure::lock::lock($lock, &ACQUIRE, &HOLD)
}};
($lock:expr) => {
$crate::lock_measured!($lock, "")
};
}
75 changes: 75 additions & 0 deletions cmd/zfs_object_agent/util/src/measure/lock_non_send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::marker::PhantomData;
use std::ops::Deref;
use std::ops::DerefMut;

use super::lock::MeasuredMutexGuard;
use super::Measurement;

pub struct NonSendMeasuredMutexGuard<'a, T> {
inner: MeasuredMutexGuard<'a, T>,
_marker: PhantomData<*const ()>,
}

impl<'a, T> Deref for NonSendMeasuredMutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<'a, T> DerefMut for NonSendMeasuredMutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

pub async fn lock_non_send<'a, T>(
mutex: &'a tokio::sync::Mutex<T>,
acquire: &'static Measurement,
hold: &'static Measurement,
) -> NonSendMeasuredMutexGuard<'a, T> {
NonSendMeasuredMutexGuard {
inner: super::lock::lock(mutex, acquire, hold).await,
_marker: PhantomData,
}
}

/// This locks the mutex like `Mutex::lock()`, but measures it (like `lock_measured!`), and
/// returns a new kind of guard which can not be sent between threads. This is useful if you
/// want to ensure that .await is not used while the mutex is locked by some callers, but .await
/// can be used from other callers (that use `lock_measured!` or `tokio::sync::Mutex::lock()`
/// directly).
#[macro_export]
macro_rules! lock_non_send_measured {
($lock:expr, $tag:literal) => {{
static ACQUIRE: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!(
"acquire lock non send",
$tag,
" (",
file!(),
":",
line!(),
":",
column!(),
")"
));
ACQUIRE.register();
static HOLD: $crate::measure::Measurement = $crate::measure::Measurement::new(concat!(
"hold lock non send",
$tag,
" (",
file!(),
":",
line!(),
":",
column!(),
")"
));
HOLD.register();

$crate::measure::lock_non_send::lock_non_send($lock, &ACQUIRE, &HOLD)
}};
($lock:expr) => {
$crate::lock_non_send_measured!($lock, "")
};
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod lock;
pub mod lock_non_send;

use core::fmt;
use std::fmt::Display;
use std::future::Future;
Expand Down Expand Up @@ -49,6 +52,27 @@ impl Measurement {
});
}

fn begin(&self) {
self.count.fetch_add(1, Ordering::Relaxed);
self.inflight.fetch_add(1, Ordering::Relaxed);
}

fn end(&self) {
self.inflight.fetch_sub(1, Ordering::Relaxed);
}

fn begin_timed(&self) -> Instant {
self.begin();
Instant::now()
}

fn end_timed(&self, begin: Instant) {
self.end();
#[allow(clippy::cast_possible_truncation)]
let elapsed = begin.elapsed().as_nanos() as u64;
self.nanos.fetch_add(elapsed, Ordering::Relaxed);
}

/// Wrap the provided future in one that will measure its execution.
// Lifetime annotations say that self must live longer than the `future` argument. This is
// typically satisfied by `&'static self`, i.e. the static Measurement created by `measure!()`.
Expand All @@ -66,12 +90,9 @@ impl Measurement {
// times.
self.fut_size.store(size_of_val(&future), Ordering::Relaxed);
}
self.count.fetch_add(1, Ordering::Relaxed);
self.inflight.fetch_add(1, Ordering::Relaxed);
self.begin();
// We don't use an async function or closure because it doubles the size of the future.
future.inspect(move |_| {
self.inflight.fetch_sub(1, Ordering::Relaxed);
})
future.inspect(|_| self.end())
}

pub fn fut_timed<'a, 'b, R>(
Expand All @@ -95,10 +116,9 @@ impl Measurement {
where
F: FnOnce() -> R,
{
self.count.fetch_add(1, Ordering::Relaxed);
self.inflight.fetch_add(1, Ordering::Relaxed);
self.begin();
let result = f();
self.inflight.fetch_sub(1, Ordering::Relaxed);
self.end();
result
}

Expand All @@ -107,11 +127,9 @@ impl Measurement {
where
F: FnOnce() -> R,
{
let begin = Instant::now();
let begin = self.begin_timed();
let result = self.func(f);
#[allow(clippy::cast_possible_truncation)]
let elapsed = begin.elapsed().as_nanos() as u64;
self.nanos.fetch_add(elapsed, Ordering::Relaxed);
self.end_timed(begin);
result
}

Expand Down
36 changes: 0 additions & 36 deletions cmd/zfs_object_agent/util/src/mutex_ext.rs

This file was deleted.

10 changes: 6 additions & 4 deletions cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ impl MergeMessage {
let timer = Instant::now();
let free_count = frees.len();
let cache_updates_count = cache_updates.len();
let (new_index, index_delta) = next_index.flush().await;
let (new_index, index_delta) = measure!("new_progress() next_index.flush()")
.fut_timed(next_index.flush())
.await;
let message = MergeProgress {
new_index,
index_delta,
Expand Down Expand Up @@ -265,6 +267,8 @@ impl Progress {
/// nothing to send.
async fn report(&mut self) {
if let Some(last_key) = self.last_key {
let entries_len = self.entries.len();
let frees_len = self.frees.len();
measure!("Progress::report() tx.send(IndexMessage)")
.fut_timed(self.tx.send(IndexMessage {
last_key,
Expand All @@ -280,9 +284,7 @@ impl Progress {
.await
.unwrap_or_else(|e| panic!("couldn't send: {e}"));
trace!(
"Collected and sent {} entries and {} frees to next_index_task in {}ms",
self.entries.len(),
self.frees.len(),
"Collected and sent {entries_len} entries and {frees_len} frees to next_index_task after {}ms",
self.timer.elapsed().as_millis()
);
self.timer = Instant::now();
Expand Down
Loading

0 comments on commit 7a5dae7

Please sign in to comment.