From 7e121597d8be94b8a720a4c8fcd4bed832e4d0ab Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 17 Jan 2025 16:23:11 +0100 Subject: [PATCH] feat(code/example): Add metrics to the example app's store (#636) * Add metrics struct * Add metrics to example channel app's store (#778) * added metrics to example store * optimized storing metrics in example app * added time observing metrics to example app * moving SharedRegistry import below * serving metrics in app_channel * renamed channel_app prefix from blockstore to app_channel * Handle errors in metrics server * Improve metrics collection in store * Formatting --------- Co-authored-by: Romain Ruetschi --------- Co-authored-by: Aleksandar Ignjatijevic --- code/crates/test/cli/src/cmd/start.rs | 4 +- code/crates/test/cli/src/metrics.rs | 24 +++- code/examples/channel/src/main.rs | 1 + code/examples/channel/src/metrics.rs | 188 ++++++++++++++++++++++++++ code/examples/channel/src/node.rs | 12 +- code/examples/channel/src/store.rs | 98 ++++++++++++-- 6 files changed, 304 insertions(+), 23 deletions(-) create mode 100644 code/examples/channel/src/metrics.rs diff --git a/code/crates/test/cli/src/cmd/start.rs b/code/crates/test/cli/src/cmd/start.rs index 15467bebf..bd0a5236c 100644 --- a/code/crates/test/cli/src/cmd/start.rs +++ b/code/crates/test/cli/src/cmd/start.rs @@ -29,7 +29,9 @@ impl StartCmd { pub async fn start(node: impl Node, metrics: Option) -> eyre::Result<()> { // Enable Prometheus if let Some(metrics) = metrics { - tokio::spawn(metrics::serve(metrics.clone())); + if metrics.enabled { + tokio::spawn(metrics::serve(metrics.listen_addr)); + } } // Start the node diff --git a/code/crates/test/cli/src/metrics.rs b/code/crates/test/cli/src/metrics.rs index 80c193167..6acb427f0 100644 --- a/code/crates/test/cli/src/metrics.rs +++ b/code/crates/test/cli/src/metrics.rs @@ -1,18 +1,28 @@ +use std::io; + use axum::routing::get; use axum::Router; -use tokio::net::TcpListener; -use tracing::info; +use tokio::net::{TcpListener, ToSocketAddrs}; +use tracing::{error, info}; use malachitebft_app::metrics::export; -use malachitebft_config::MetricsConfig; #[tracing::instrument(name = "metrics", skip_all)] -pub async fn serve(config: MetricsConfig) { +pub async fn serve(listen_addr: impl ToSocketAddrs) { + if let Err(e) = inner(listen_addr).await { + error!("Metrics server failed: {e}"); + } +} + +async fn inner(listen_addr: impl ToSocketAddrs) -> io::Result<()> { let app = Router::new().route("/metrics", get(get_metrics)); - let listener = TcpListener::bind(config.listen_addr).await.unwrap(); + let listener = TcpListener::bind(listen_addr).await?; + let local_addr = listener.local_addr()?; + + info!(address = %local_addr, "Serving metrics"); + axum::serve(listener, app).await?; - info!(address = %config.listen_addr, "Serving metrics"); - axum::serve(listener, app).await.unwrap(); + Ok(()) } async fn get_metrics() -> String { diff --git a/code/examples/channel/src/main.rs b/code/examples/channel/src/main.rs index 887f30cd3..cc0ebfcd4 100644 --- a/code/examples/channel/src/main.rs +++ b/code/examples/channel/src/main.rs @@ -12,6 +12,7 @@ use malachitebft_test_cli::cmd::testnet::TestnetCmd; use malachitebft_test_cli::{config, logging, runtime}; mod app; +mod metrics; mod node; mod state; mod store; diff --git a/code/examples/channel/src/metrics.rs b/code/examples/channel/src/metrics.rs new file mode 100644 index 000000000..633aeaf96 --- /dev/null +++ b/code/examples/channel/src/metrics.rs @@ -0,0 +1,188 @@ +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; + +use malachitebft_app_channel::app::metrics; + +use metrics::prometheus::metrics::counter::Counter; +use metrics::prometheus::metrics::gauge::Gauge; +use metrics::prometheus::metrics::histogram::{exponential_buckets, Histogram}; +use metrics::SharedRegistry; + +#[derive(Clone, Debug)] +pub struct DbMetrics(Arc); + +impl Deref for DbMetrics { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Debug)] +pub struct Inner { + /// Size of the database database (bytes) + db_size: Gauge, + + /// Amount of data written to the database (bytes) + db_write_bytes: Counter, + + /// Amount of data read from the database (bytes) + db_read_bytes: Counter, + + /// Amount of key data read from the database (bytes) + db_key_read_bytes: Counter, + + /// Total number of reads from the database + db_read_count: Counter, + + /// Total number of writes to the database + db_write_count: Counter, + + /// Total number of deletions to the database + db_delete_count: Counter, + + /// Time taken to read from the database (seconds) + db_read_time: Histogram, + + /// Time taken to write to the database (seconds) + db_write_time: Histogram, + + /// Time taken to delete from the database (seconds) + db_delete_time: Histogram, +} + +impl Inner { + pub fn new() -> Self { + Self { + db_size: Gauge::default(), + db_write_bytes: Counter::default(), + db_read_bytes: Counter::default(), + db_key_read_bytes: Counter::default(), + db_read_count: Counter::default(), + db_write_count: Counter::default(), + db_delete_count: Counter::default(), + db_read_time: Histogram::new(exponential_buckets(0.001, 2.0, 10)), // Start from 1ms + db_write_time: Histogram::new(exponential_buckets(0.001, 2.0, 10)), + db_delete_time: Histogram::new(exponential_buckets(0.001, 2.0, 10)), + } + } +} + +impl Default for Inner { + fn default() -> Self { + Self::new() + } +} + +impl DbMetrics { + pub fn new() -> Self { + Self(Arc::new(Inner::new())) + } + + pub fn register(registry: &SharedRegistry) -> Self { + let metrics = Self::new(); + + registry.with_prefix("app_channel", |registry| { + registry.register( + "db_size", + "Size of the database (bytes)", + metrics.db_size.clone(), + ); + + registry.register( + "db_write_bytes_total", + "Amount of data written to the database (bytes)", + metrics.db_write_bytes.clone(), + ); + + registry.register( + "db_read_bytes_total", + "Amount of data read from the database (bytes)", + metrics.db_read_bytes.clone(), + ); + + registry.register( + "db_key_read_bytes_total", + "Amount of key data read from the database (bytes)", + metrics.db_key_read_bytes.clone(), + ); + + registry.register( + "db_read_count_total", + "Total number of reads from the database", + metrics.db_read_count.clone(), + ); + + registry.register( + "db_write_count_total", + "Total number of writes to the database", + metrics.db_write_count.clone(), + ); + + registry.register( + "db_delete_count_total", + "Total number of deletions to the database", + metrics.db_delete_count.clone(), + ); + + registry.register( + "db_read_time", + "Time taken to read bytes from the database (seconds)", + metrics.db_read_time.clone(), + ); + + registry.register( + "db_write_time", + "Time taken to write bytes to the database (seconds)", + metrics.db_write_time.clone(), + ); + + registry.register( + "db_delete_time", + "Time taken to delete bytes from the database (seconds)", + metrics.db_delete_time.clone(), + ); + }); + + metrics + } + + #[allow(dead_code)] + pub fn set_db_size(&self, size: usize) { + self.db_size.set(size as i64); + } + + pub fn add_write_bytes(&self, bytes: u64) { + self.db_write_bytes.inc_by(bytes); + self.db_write_count.inc(); + } + + pub fn add_read_bytes(&self, bytes: u64) { + self.db_read_bytes.inc_by(bytes); + self.db_read_count.inc(); + } + + pub fn add_key_read_bytes(&self, bytes: u64) { + self.db_key_read_bytes.inc_by(bytes); + } + + pub fn observe_read_time(&self, duration: Duration) { + self.db_read_time.observe(duration.as_secs_f64()); + } + + pub fn observe_write_time(&self, duration: Duration) { + self.db_write_time.observe(duration.as_secs_f64()); + } + + pub fn observe_delete_time(&self, duration: Duration) { + self.db_delete_time.observe(duration.as_secs_f64()); + } +} + +impl Default for DbMetrics { + fn default() -> Self { + Self::new() + } +} diff --git a/code/examples/channel/src/node.rs b/code/examples/channel/src/node.rs index 6f1d7589d..8ed6ac5f4 100644 --- a/code/examples/channel/src/node.rs +++ b/code/examples/channel/src/node.rs @@ -6,6 +6,7 @@ use std::path::{Path, PathBuf}; use async_trait::async_trait; use rand::{CryptoRng, RngCore}; +use malachitebft_app_channel::app::metrics::SharedRegistry; use malachitebft_app_channel::app::types::config::Config; use malachitebft_app_channel::app::types::core::VotingPower; use malachitebft_app_channel::app::types::Keypair; @@ -17,7 +18,9 @@ use malachitebft_test::codec::proto::ProtobufCodec; use malachitebft_test::{ Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet, }; +use malachitebft_test_cli::metrics; +use crate::metrics::DbMetrics; use crate::state::State; use crate::store::Store; @@ -117,7 +120,14 @@ impl Node for App { ) .await?; - let store = Store::open(self.get_home_dir().join("store.db"))?; + let registry = SharedRegistry::global().with_moniker(&self.config.moniker); + let metrics = DbMetrics::register(®istry); + + if self.config.metrics.enabled { + tokio::spawn(metrics::serve(self.config.metrics.listen_addr)); + } + + let store = Store::open(self.get_home_dir().join("store.db"), metrics)?; let start_height = self.start_height.unwrap_or_default(); let mut state = State::new(ctx, address, start_height, store); diff --git a/code/examples/channel/src/store.rs b/code/examples/channel/src/store.rs index 5fdf34305..900b31337 100644 --- a/code/examples/channel/src/store.rs +++ b/code/examples/channel/src/store.rs @@ -1,6 +1,8 @@ +use std::mem::size_of; use std::ops::RangeBounds; use std::path::Path; use std::sync::Arc; +use std::time::Instant; use bytes::Bytes; use prost::Message; @@ -20,6 +22,8 @@ use malachitebft_test::{Height, TestContext, Value}; mod keys; use keys::{HeightKey, UndecidedValueKey}; +use crate::metrics::DbMetrics; + #[derive(Clone, Debug)] pub struct DecidedValue { pub value: Value, @@ -71,28 +75,47 @@ const UNDECIDED_PROPOSALS_TABLE: redb::TableDefinition) -> Result { + fn new(path: impl AsRef, metrics: DbMetrics) -> Result { Ok(Self { db: redb::Database::create(path).map_err(StoreError::Database)?, + metrics, }) } fn get_decided_value(&self, height: Height) -> Result, StoreError> { + let start = Instant::now(); + let mut read_bytes = 0; + let tx = self.db.begin_read()?; + let value = { let table = tx.open_table(DECIDED_VALUES_TABLE)?; let value = table.get(&height)?; - value.and_then(|value| Value::from_bytes(&value.value()).ok()) + value.and_then(|value| { + let bytes = value.value(); + read_bytes = bytes.len() as u64; + Value::from_bytes(&bytes).ok() + }) }; + let certificate = { let table = tx.open_table(CERTIFICATES_TABLE)?; let value = table.get(&height)?; - value.and_then(|value| decode_certificate(&value.value()).ok()) + value.and_then(|value| { + let bytes = value.value(); + read_bytes += bytes.len() as u64; + decode_certificate(&bytes).ok() + }) }; + self.metrics.observe_read_time(start.elapsed()); + self.metrics.add_read_bytes(read_bytes); + self.metrics.add_key_read_bytes(size_of::() as u64); + let decided_value = value .zip(certificate) .map(|(value, certificate)| DecidedValue { value, certificate }); @@ -101,19 +124,31 @@ impl Db { } fn insert_decided_value(&self, decided_value: DecidedValue) -> Result<(), StoreError> { - let height = decided_value.certificate.height; + let start = Instant::now(); + let mut write_bytes = 0; + let height = decided_value.certificate.height; let tx = self.db.begin_write()?; + { let mut values = tx.open_table(DECIDED_VALUES_TABLE)?; - values.insert(height, decided_value.value.to_bytes()?.to_vec())?; + let values_bytes = decided_value.value.to_bytes()?.to_vec(); + write_bytes += values_bytes.len() as u64; + values.insert(height, values_bytes)?; } + { let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; - certificates.insert(height, encode_certificate(&decided_value.certificate)?)?; + let encoded_certificate = encode_certificate(&decided_value.certificate)?; + write_bytes += encoded_certificate.len() as u64; + certificates.insert(height, encoded_certificate)?; } + tx.commit()?; + self.metrics.observe_write_time(start.elapsed()); + self.metrics.add_write_bytes(write_bytes); + Ok(()) } @@ -123,19 +158,30 @@ impl Db { height: Height, round: Round, ) -> Result>, StoreError> { + let start = Instant::now(); + let mut read_bytes = 0; + let tx = self.db.begin_read()?; let table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; let value = if let Ok(Some(value)) = table.get(&(height, round)) { - Some( - ProtobufCodec - .decode(Bytes::from(value.value())) - .map_err(StoreError::Protobuf)?, - ) + let bytes = value.value(); + read_bytes += bytes.len() as u64; + + let proposal = ProtobufCodec + .decode(Bytes::from(bytes)) + .map_err(StoreError::Protobuf)?; + + Some(proposal) } else { None }; + self.metrics.observe_read_time(start.elapsed()); + self.metrics.add_read_bytes(read_bytes); + self.metrics + .add_key_read_bytes(size_of::<(Height, Round)>() as u64); + Ok(value) } @@ -143,14 +189,21 @@ impl Db { &self, proposal: ProposedValue, ) -> Result<(), StoreError> { + let start = Instant::now(); + let key = (proposal.height, proposal.round); let value = ProtobufCodec.encode(&proposal)?; + let tx = self.db.begin_write()?; { let mut table = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; table.insert(key, value.to_vec())?; } tx.commit()?; + + self.metrics.observe_write_time(start.elapsed()); + self.metrics.add_write_bytes(value.len() as u64); + Ok(()) } @@ -185,7 +238,10 @@ impl Db { } fn prune(&self, retain_height: Height) -> Result, StoreError> { + let start = Instant::now(); + let tx = self.db.begin_write().unwrap(); + let pruned = { let mut undecided = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; let keys = self.undecided_proposals_range(&undecided, ..(retain_height, Round::Nil))?; @@ -203,15 +259,25 @@ impl Db { } keys }; + tx.commit()?; + self.metrics.observe_delete_time(start.elapsed()); + Ok(pruned) } fn min_decided_value_height(&self) -> Option { + let start = Instant::now(); + let tx = self.db.begin_read().unwrap(); let table = tx.open_table(DECIDED_VALUES_TABLE).unwrap(); - let (key, _) = table.first().ok()??; + let (key, value) = table.first().ok()??; + + self.metrics.observe_read_time(start.elapsed()); + self.metrics.add_read_bytes(value.value().len() as u64); + self.metrics.add_key_read_bytes(size_of::() as u64); + Some(key.value()) } @@ -224,11 +290,14 @@ impl Db { fn create_tables(&self) -> Result<(), StoreError> { let tx = self.db.begin_write()?; + // Implicitly creates the tables if they do not exist yet let _ = tx.open_table(DECIDED_VALUES_TABLE)?; let _ = tx.open_table(CERTIFICATES_TABLE)?; let _ = tx.open_table(UNDECIDED_PROPOSALS_TABLE)?; + tx.commit()?; + Ok(()) } } @@ -239,8 +308,8 @@ pub struct Store { } impl Store { - pub fn open(path: impl AsRef) -> Result { - let db = Db::new(path)?; + pub fn open(path: impl AsRef, metrics: DbMetrics) -> Result { + let db = Db::new(path, metrics)?; db.create_tables()?; Ok(Self { db: Arc::new(db) }) @@ -267,6 +336,7 @@ impl Store { height: Height, ) -> Result, StoreError> { let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.get_decided_value(height)).await? }