Skip to content

Commit

Permalink
feat(code/example): Add metrics to the example app's store (#636)
Browse files Browse the repository at this point in the history
* Add metrics struct

* Add metrics to example channel app's store (#778)

* added metrics to example store

* optimized storing metrics in example app

* added time observing metrics to example app

* moving SharedRegistry import below

* serving metrics in app_channel

* renamed channel_app prefix from blockstore to app_channel

* Handle errors in metrics server

* Improve metrics collection in store

* Formatting

---------

Co-authored-by: Romain Ruetschi <romain@informal.systems>

---------

Co-authored-by: Aleksandar Ignjatijevic <aleksandarignjatijevic@informal.systems>
  • Loading branch information
romac and oakenknight authored Jan 17, 2025
1 parent 5713c8b commit 7e12159
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 23 deletions.
4 changes: 3 additions & 1 deletion code/crates/test/cli/src/cmd/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl StartCmd {
pub async fn start(node: impl Node, metrics: Option<MetricsConfig>) -> eyre::Result<()> {
// Enable Prometheus
if let Some(metrics) = metrics {
tokio::spawn(metrics::serve(metrics.clone()));
if metrics.enabled {
tokio::spawn(metrics::serve(metrics.listen_addr));
}
}

// Start the node
Expand Down
24 changes: 17 additions & 7 deletions code/crates/test/cli/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use std::io;

use axum::routing::get;
use axum::Router;
use tokio::net::TcpListener;
use tracing::info;
use tokio::net::{TcpListener, ToSocketAddrs};
use tracing::{error, info};

use malachitebft_app::metrics::export;
use malachitebft_config::MetricsConfig;

#[tracing::instrument(name = "metrics", skip_all)]
pub async fn serve(config: MetricsConfig) {
pub async fn serve(listen_addr: impl ToSocketAddrs) {
if let Err(e) = inner(listen_addr).await {
error!("Metrics server failed: {e}");
}
}

async fn inner(listen_addr: impl ToSocketAddrs) -> io::Result<()> {
let app = Router::new().route("/metrics", get(get_metrics));
let listener = TcpListener::bind(config.listen_addr).await.unwrap();
let listener = TcpListener::bind(listen_addr).await?;
let local_addr = listener.local_addr()?;

info!(address = %local_addr, "Serving metrics");
axum::serve(listener, app).await?;

info!(address = %config.listen_addr, "Serving metrics");
axum::serve(listener, app).await.unwrap();
Ok(())
}

async fn get_metrics() -> String {
Expand Down
1 change: 1 addition & 0 deletions code/examples/channel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use malachitebft_test_cli::cmd::testnet::TestnetCmd;
use malachitebft_test_cli::{config, logging, runtime};

mod app;
mod metrics;
mod node;
mod state;
mod store;
Expand Down
188 changes: 188 additions & 0 deletions code/examples/channel/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use malachitebft_app_channel::app::metrics;

use metrics::prometheus::metrics::counter::Counter;
use metrics::prometheus::metrics::gauge::Gauge;
use metrics::prometheus::metrics::histogram::{exponential_buckets, Histogram};
use metrics::SharedRegistry;

#[derive(Clone, Debug)]
pub struct DbMetrics(Arc<Inner>);

impl Deref for DbMetrics {
type Target = Inner;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[derive(Debug)]
pub struct Inner {
/// Size of the database database (bytes)
db_size: Gauge,

/// Amount of data written to the database (bytes)
db_write_bytes: Counter,

/// Amount of data read from the database (bytes)
db_read_bytes: Counter,

/// Amount of key data read from the database (bytes)
db_key_read_bytes: Counter,

/// Total number of reads from the database
db_read_count: Counter,

/// Total number of writes to the database
db_write_count: Counter,

/// Total number of deletions to the database
db_delete_count: Counter,

/// Time taken to read from the database (seconds)
db_read_time: Histogram,

/// Time taken to write to the database (seconds)
db_write_time: Histogram,

/// Time taken to delete from the database (seconds)
db_delete_time: Histogram,
}

impl Inner {
pub fn new() -> Self {
Self {
db_size: Gauge::default(),
db_write_bytes: Counter::default(),
db_read_bytes: Counter::default(),
db_key_read_bytes: Counter::default(),
db_read_count: Counter::default(),
db_write_count: Counter::default(),
db_delete_count: Counter::default(),
db_read_time: Histogram::new(exponential_buckets(0.001, 2.0, 10)), // Start from 1ms
db_write_time: Histogram::new(exponential_buckets(0.001, 2.0, 10)),
db_delete_time: Histogram::new(exponential_buckets(0.001, 2.0, 10)),
}
}
}

impl Default for Inner {
fn default() -> Self {
Self::new()
}
}

impl DbMetrics {
pub fn new() -> Self {
Self(Arc::new(Inner::new()))
}

pub fn register(registry: &SharedRegistry) -> Self {
let metrics = Self::new();

registry.with_prefix("app_channel", |registry| {
registry.register(
"db_size",
"Size of the database (bytes)",
metrics.db_size.clone(),
);

registry.register(
"db_write_bytes_total",
"Amount of data written to the database (bytes)",
metrics.db_write_bytes.clone(),
);

registry.register(
"db_read_bytes_total",
"Amount of data read from the database (bytes)",
metrics.db_read_bytes.clone(),
);

registry.register(
"db_key_read_bytes_total",
"Amount of key data read from the database (bytes)",
metrics.db_key_read_bytes.clone(),
);

registry.register(
"db_read_count_total",
"Total number of reads from the database",
metrics.db_read_count.clone(),
);

registry.register(
"db_write_count_total",
"Total number of writes to the database",
metrics.db_write_count.clone(),
);

registry.register(
"db_delete_count_total",
"Total number of deletions to the database",
metrics.db_delete_count.clone(),
);

registry.register(
"db_read_time",
"Time taken to read bytes from the database (seconds)",
metrics.db_read_time.clone(),
);

registry.register(
"db_write_time",
"Time taken to write bytes to the database (seconds)",
metrics.db_write_time.clone(),
);

registry.register(
"db_delete_time",
"Time taken to delete bytes from the database (seconds)",
metrics.db_delete_time.clone(),
);
});

metrics
}

#[allow(dead_code)]
pub fn set_db_size(&self, size: usize) {
self.db_size.set(size as i64);
}

pub fn add_write_bytes(&self, bytes: u64) {
self.db_write_bytes.inc_by(bytes);
self.db_write_count.inc();
}

pub fn add_read_bytes(&self, bytes: u64) {
self.db_read_bytes.inc_by(bytes);
self.db_read_count.inc();
}

pub fn add_key_read_bytes(&self, bytes: u64) {
self.db_key_read_bytes.inc_by(bytes);
}

pub fn observe_read_time(&self, duration: Duration) {
self.db_read_time.observe(duration.as_secs_f64());
}

pub fn observe_write_time(&self, duration: Duration) {
self.db_write_time.observe(duration.as_secs_f64());
}

pub fn observe_delete_time(&self, duration: Duration) {
self.db_delete_time.observe(duration.as_secs_f64());
}
}

impl Default for DbMetrics {
fn default() -> Self {
Self::new()
}
}
12 changes: 11 additions & 1 deletion code/examples/channel/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::{Path, PathBuf};
use async_trait::async_trait;
use rand::{CryptoRng, RngCore};

use malachitebft_app_channel::app::metrics::SharedRegistry;
use malachitebft_app_channel::app::types::config::Config;
use malachitebft_app_channel::app::types::core::VotingPower;
use malachitebft_app_channel::app::types::Keypair;
Expand All @@ -17,7 +18,9 @@ use malachitebft_test::codec::proto::ProtobufCodec;
use malachitebft_test::{
Address, Genesis, Height, PrivateKey, PublicKey, TestContext, Validator, ValidatorSet,
};
use malachitebft_test_cli::metrics;

use crate::metrics::DbMetrics;
use crate::state::State;
use crate::store::Store;

Expand Down Expand Up @@ -117,7 +120,14 @@ impl Node for App {
)
.await?;

let store = Store::open(self.get_home_dir().join("store.db"))?;
let registry = SharedRegistry::global().with_moniker(&self.config.moniker);
let metrics = DbMetrics::register(&registry);

if self.config.metrics.enabled {
tokio::spawn(metrics::serve(self.config.metrics.listen_addr));
}

let store = Store::open(self.get_home_dir().join("store.db"), metrics)?;
let start_height = self.start_height.unwrap_or_default();
let mut state = State::new(ctx, address, start_height, store);

Expand Down
Loading

0 comments on commit 7e12159

Please sign in to comment.