Skip to content

Commit

Permalink
perf(metrics): cache handles for database transaction metrics (#6730)
Browse files Browse the repository at this point in the history
  • Loading branch information
sevazhidkov authored Feb 22, 2024
1 parent 9c7b2b5 commit 755ce9e
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 41 deletions.
11 changes: 4 additions & 7 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
use super::cursor::Cursor;
use crate::{
metrics::{
DatabaseEnvMetrics, Operation, TransactionMetrics, TransactionMode, TransactionOutcome,
},
metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
table::{Compress, DupSort, Encode, Table, TableImporter},
tables::{utils::decode_one, Tables},
transaction::{DbTx, DbTxMut},
Expand Down Expand Up @@ -55,7 +53,7 @@ impl<K: TransactionKind> Tx<K> {
) -> Self {
let metrics_handler = if let Some(metrics) = metrics {
let handler = MetricsHandler::<K>::new(inner.id(), metrics);
TransactionMetrics::record_open(handler.transaction_mode());
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
Some(handler)
} else {
Expand Down Expand Up @@ -128,7 +126,7 @@ impl<K: TransactionKind> Tx<K> {

let (result, commit_latency, close_duration) = run(self);
let open_duration = metrics_handler.start.elapsed();
TransactionMetrics::record_close(
metrics_handler.env_metrics.record_closed_transaction(
metrics_handler.transaction_mode(),
outcome,
open_duration,
Expand Down Expand Up @@ -244,8 +242,7 @@ impl<K: TransactionKind> Drop for MetricsHandler<K> {
fn drop(&mut self) {
if !self.close_recorded {
self.log_backtrace_on_long_read_transaction();

TransactionMetrics::record_close(
self.env_metrics.record_closed_transaction(
self.transaction_mode(),
TransactionOutcome::Drop,
self.start.elapsed(),
Expand Down
152 changes: 118 additions & 34 deletions crates/storage/db/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::Tables;
use metrics::{Gauge, Histogram};
use reth_libmdbx::CommitLatency;
use reth_metrics::{metrics::Counter, Metrics};
use rustc_hash::FxHasher;
use rustc_hash::{FxHashMap, FxHasher};
use std::{
collections::HashMap,
hash::BuildHasherDefault,
Expand All @@ -20,14 +20,31 @@ const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
#[derive(Debug)]
pub struct DatabaseEnvMetrics {
/// Caches OperationMetrics handles for each table and operation tuple.
operations: HashMap<(Tables, Operation), OperationMetrics, BuildHasherDefault<FxHasher>>,
operations: FxHashMap<(Tables, Operation), OperationMetrics>,
/// Caches TransactionMetrics handles for counters grouped by only transaction mode.
/// Updated both at tx open and close.
transactions: FxHashMap<TransactionMode, TransactionMetrics>,
/// Caches TransactionOutcomeMetrics handles for counters grouped by transaction mode and
/// outcome. Can only be updated at tx close, as outcome is only known at that point.
transaction_outcomes:
FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
}

impl DatabaseEnvMetrics {
pub(crate) fn new() -> Self {
// Pre-populate the map with all possible table and operation combinations
// Pre-populate metric handle maps with all possible combinations of labels
// to avoid runtime locks on the map when recording metrics.
let mut operations = HashMap::with_capacity_and_hasher(
Self {
operations: Self::generate_operation_handles(),
transactions: Self::generate_transaction_handles(),
transaction_outcomes: Self::generate_transaction_outcome_handles(),
}
}

/// Generate a map of all possible operation handles for each table and operation tuple.
/// Used for tracking all operation metrics.
fn generate_operation_handles() -> FxHashMap<(Tables, Operation), OperationMetrics> {
let mut operations = FxHashMap::with_capacity_and_hasher(
Tables::COUNT * Operation::COUNT,
BuildHasherDefault::<FxHasher>::default(),
);
Expand All @@ -42,7 +59,45 @@ impl DatabaseEnvMetrics {
);
}
}
Self { operations }
operations
}

/// Generate a map of all possible transaction modes to metric handles.
/// Used for tracking a counter of open transactions.
fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
TransactionMode::iter()
.map(|mode| {
(
mode,
TransactionMetrics::new_with_labels(&[(
Labels::TransactionMode.as_str(),
mode.as_str(),
)]),
)
})
.collect()
}

/// Generate a map of all possible transaction mode and outcome handles.
/// Used for tracking various stats for finished transactions (e.g. commit duration).
fn generate_transaction_outcome_handles(
) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
let mut transaction_outcomes = HashMap::with_capacity_and_hasher(
TransactionMode::COUNT * TransactionOutcome::COUNT,
BuildHasherDefault::<FxHasher>::default(),
);
for mode in TransactionMode::iter() {
for outcome in TransactionOutcome::iter() {
transaction_outcomes.insert(
(mode, outcome),
TransactionOutcomeMetrics::new_with_labels(&[
(Labels::TransactionMode.as_str(), mode.as_str()),
(Labels::TransactionOutcome.as_str(), outcome.as_str()),
]),
);
}
}
transaction_outcomes
}

/// Record a metric for database operation executed in `f`.
Expand All @@ -59,10 +114,38 @@ impl DatabaseEnvMetrics {
.expect("operation & table metric handle not found")
.record(value_size, f)
}

/// Record metrics for opening a database transaction.
pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
self.transactions
.get(&mode)
.expect("transaction mode metric handle not found")
.record_open();
}

/// Record metrics for closing a database transactions.
pub(crate) fn record_closed_transaction(
&self,
mode: TransactionMode,
outcome: TransactionOutcome,
open_duration: Duration,
close_duration: Option<Duration>,
commit_latency: Option<CommitLatency>,
) {
self.transactions
.get(&mode)
.expect("transaction mode metric handle not found")
.record_close();

self.transaction_outcomes
.get(&(mode, outcome))
.expect("transaction outcome metric handle not found")
.record(open_duration, close_duration, commit_latency);
}
}

/// Transaction mode for the database, either read-only or read-write.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
pub(crate) enum TransactionMode {
/// Read-only transaction mode.
ReadOnly,
Expand All @@ -85,7 +168,7 @@ impl TransactionMode {
}

/// Transaction outcome after a database operation - commit, abort, or drop.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
pub(crate) enum TransactionOutcome {
/// Successful commit of the transaction.
Commit,
Expand Down Expand Up @@ -175,6 +258,21 @@ impl Labels {
pub(crate) struct TransactionMetrics {
/// Total number of currently open database transactions
open_total: Gauge,
}

impl TransactionMetrics {
pub(crate) fn record_open(&self) {
self.open_total.increment(1.0);
}

pub(crate) fn record_close(&self) {
self.open_total.decrement(1.0);
}
}

#[derive(Metrics, Clone)]
#[metrics(scope = "database.transaction")]
pub(crate) struct TransactionOutcomeMetrics {
/// The time a database transaction has been open
open_duration_seconds: Histogram,
/// The time it took to close a database transaction
Expand All @@ -198,44 +296,30 @@ pub(crate) struct TransactionMetrics {
commit_gc_cputime_duration_seconds: Histogram,
}

impl TransactionMetrics {
/// Record transaction opening.
pub(crate) fn record_open(mode: TransactionMode) {
let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]);
metrics.open_total.increment(1.0);
}

impl TransactionOutcomeMetrics {
/// Record transaction closing with the duration it was open and the duration it took to close
/// it.
pub(crate) fn record_close(
mode: TransactionMode,
outcome: TransactionOutcome,
pub(crate) fn record(
&self,
open_duration: Duration,
close_duration: Option<Duration>,
commit_latency: Option<CommitLatency>,
) {
let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]);
metrics.open_total.decrement(1.0);

let metrics = Self::new_with_labels(&[
(Labels::TransactionMode.as_str(), mode.as_str()),
(Labels::TransactionOutcome.as_str(), outcome.as_str()),
]);
metrics.open_duration_seconds.record(open_duration);
self.open_duration_seconds.record(open_duration);

if let Some(close_duration) = close_duration {
metrics.close_duration_seconds.record(close_duration)
self.close_duration_seconds.record(close_duration)
}

if let Some(commit_latency) = commit_latency {
metrics.commit_preparation_duration_seconds.record(commit_latency.preparation());
metrics.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
metrics.commit_audit_duration_seconds.record(commit_latency.audit());
metrics.commit_write_duration_seconds.record(commit_latency.write());
metrics.commit_sync_duration_seconds.record(commit_latency.sync());
metrics.commit_ending_duration_seconds.record(commit_latency.ending());
metrics.commit_whole_duration_seconds.record(commit_latency.whole());
metrics.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
self.commit_preparation_duration_seconds.record(commit_latency.preparation());
self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
self.commit_audit_duration_seconds.record(commit_latency.audit());
self.commit_write_duration_seconds.record(commit_latency.write());
self.commit_sync_duration_seconds.record(commit_latency.sync());
self.commit_ending_duration_seconds.record(commit_latency.ending());
self.commit_whole_duration_seconds.record(commit_latency.whole());
self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
}
}
}
Expand Down

0 comments on commit 755ce9e

Please sign in to comment.