diff --git a/Cargo.lock b/Cargo.lock index 3cd281db09f..3a40da114e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,6 +372,7 @@ dependencies = [ "candid", "ic-canister-client", "ic-types", + "prometheus", "rand 0.8.5", "rsa", "thiserror 2.0.3", diff --git a/rs/boundary_node/anonymization/client/BUILD.bazel b/rs/boundary_node/anonymization/client/BUILD.bazel index 9e79c57884d..500936f81d9 100644 --- a/rs/boundary_node/anonymization/client/BUILD.bazel +++ b/rs/boundary_node/anonymization/client/BUILD.bazel @@ -8,6 +8,7 @@ DEPENDENCIES = [ "//rs/types/types", "@crate_index//:anyhow", "@crate_index//:candid", + "@crate_index//:prometheus", "@crate_index//:rand", "@crate_index//:rsa", "@crate_index//:thiserror", diff --git a/rs/boundary_node/anonymization/client/Cargo.toml b/rs/boundary_node/anonymization/client/Cargo.toml index c5798d9a174..be435c7313f 100644 --- a/rs/boundary_node/anonymization/client/Cargo.toml +++ b/rs/boundary_node/anonymization/client/Cargo.toml @@ -9,6 +9,7 @@ async-trait = { workspace = true } candid = { workspace = true } ic-canister-client = { path = "../../../canister_client" } ic-types = { path = "../../../types/types" } +prometheus = { workspace = true } rand = { workspace = true } rsa = { workspace = true } thiserror = { workspace = true } diff --git a/rs/boundary_node/anonymization/client/src/lib.rs b/rs/boundary_node/anonymization/client/src/lib.rs index 16a614ceab8..92c0b1c672c 100644 --- a/rs/boundary_node/anonymization/client/src/lib.rs +++ b/rs/boundary_node/anonymization/client/src/lib.rs @@ -8,6 +8,10 @@ use anyhow::{anyhow, Context, Error}; use async_trait::async_trait; use candid::{Decode, Encode, Principal}; use ic_canister_client::Agent; +use prometheus::{ + register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramOpts, + HistogramVec, IntCounterVec, Registry, +}; use rsa::{ pkcs1::{DecodeRsaPublicKey, EncodeRsaPublicKey}, rand_core::CryptoRngCore, @@ -20,7 +24,58 @@ use tokio::{sync::Mutex, time::sleep}; const SALT_SIZE: usize = 64; const RSA_KEY_SIZE: usize = 2048; -struct WithLogs(T); +#[derive(Clone, Debug)] +pub struct MetricParams { + pub action: String, + pub counter: IntCounterVec, + pub recorder: HistogramVec, +} + +impl MetricParams { + pub fn new(registry: &Registry, action: &str) -> Self { + Self::new_with_opts(registry, action, &["status"], None) + } + + pub fn new_with_opts( + registry: &Registry, + action: &str, + labels: &[&str], + buckets: Option<&[f64]>, + ) -> Self { + let mut recorder_opts = HistogramOpts::new( + format!("{action}_duration_sec"), // name + format!("Records the duration of {action} calls in seconds"), // description + ); + + // Set histogram buckets if given + buckets.inspect(|bs| { + recorder_opts.buckets = bs.to_vec(); + }); + + Self { + action: action.to_string(), + + // Count + counter: register_int_counter_vec_with_registry!( + format!("{action}_total"), // name + format!("Counts occurrences of {action} calls"), // description + labels, // labels + registry, // registry + ) + .expect("failed to register counter"), + + // Duration + recorder: register_histogram_vec_with_registry!( + recorder_opts, // options + labels, // labels + registry, // registry + ) + .expect("failed to register histogram"), + } + } +} + +struct WithMetrics(T, Option); pub struct ThrottleParams { pub d: Duration, @@ -92,7 +147,7 @@ pub trait Register: Sync + Send { } #[async_trait] -impl Register for WithLogs { +impl Register for WithMetrics { async fn register(&self, pubkey: &[u8]) -> Result<(), RegisterError> { let start_time = Instant::now(); @@ -108,11 +163,24 @@ impl Register for WithLogs { let duration = start_time.elapsed().as_secs_f64(); + // Log println!( "action = 'register', status = {status}, duration = {duration}, error = {:?}", out.as_ref().err() ); + // Metrics + if let Some(MetricParams { + counter, recorder, .. + }) = &self.1 + { + // Count + counter.with_label_values(&[status]).inc(); + + // Latency + recorder.with_label_values(&[status]).observe(duration); + } + return out; } } @@ -181,7 +249,7 @@ pub trait Query: Sync + Send { } #[async_trait] -impl Query for WithLogs { +impl Query for WithMetrics { async fn query(&self) -> Result, QueryError> { let start_time = Instant::now(); @@ -202,11 +270,24 @@ impl Query for WithLogs { let duration = start_time.elapsed().as_secs_f64(); + // Log println!( "action = 'query', status = {status}, duration = {duration}, error = {:?}", out.as_ref().err() ); + // Metrics + if let Some(MetricParams { + counter, recorder, .. + }) = &self.1 + { + // Count + counter.with_label_values(&[status]).inc(); + + // Latency + recorder.with_label_values(&[status]).observe(duration); + } + return out; } } @@ -226,7 +307,7 @@ pub trait Submit: Sync + Send { } #[async_trait] -impl Submit for WithLogs { +impl Submit for WithMetrics { async fn submit(&self, vs: &[Pair]) -> Result<(), SubmitError> { let start_time = Instant::now(); @@ -242,11 +323,24 @@ impl Submit for WithLogs { let duration = start_time.elapsed().as_secs_f64(); + // Log println!( "action = 'submit', status = {status}, duration = {duration}, error = {:?}", out.as_ref().err() ); + // Metrics + if let Some(MetricParams { + counter, recorder, .. + }) = &self.1 + { + // Count + counter.with_label_values(&[status]).inc(); + + // Latency + recorder.with_label_values(&[status]).observe(duration); + } + return out; } } @@ -424,23 +518,43 @@ pub struct CanisterMethods { submit: Arc, } -impl From for CanisterMethods { - fn from(value: Canister) -> Self { +pub struct CanisterMethodsBuilder<'a> { + canister: Canister, + registry: Option<&'a Registry>, +} + +impl<'a> CanisterMethodsBuilder<'a> { + pub fn new(c: Canister) -> Self { Self { - register: Arc::new({ - let v = value.clone(); - let v = WithLogs(v); - WithThrottle(v, ThrottleParams::new(Duration::from_secs(10))) - }), - query: Arc::new({ - let v = value.clone(); - let v = WithLogs(v); - WithThrottle(v, ThrottleParams::new(Duration::from_secs(10))) - }), - submit: Arc::new({ - let v = value.clone(); - WithLogs(v) - }), + canister: c, + registry: None, + } + } + + pub fn with_metrics(mut self, r: &'a Registry) -> Self { + self.registry = Some(r); + self + } + + pub fn build(self) -> CanisterMethods { + CanisterMethods { + register: { + let v = self.canister.clone(); + let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "register"))); + let v = WithThrottle(v, ThrottleParams::new(Duration::from_secs(10))); + Arc::new(v) + }, + query: { + let v = self.canister.clone(); + let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "query"))); + let v = WithThrottle(v, ThrottleParams::new(Duration::from_secs(10))); + Arc::new(v) + }, + submit: { + let v = self.canister.clone(); + let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "submit"))); + Arc::new(v) + }, } } } diff --git a/rs/boundary_node/ic_boundary/src/core.rs b/rs/boundary_node/ic_boundary/src/core.rs index 9baed78e175..28c43c03191 100644 --- a/rs/boundary_node/ic_boundary/src/core.rs +++ b/rs/boundary_node/ic_boundary/src/core.rs @@ -7,7 +7,9 @@ use std::{ }; use anonymization_client::{ - Canister as AnonymizationCanister, Track, Tracker as AnonymizationTracker, + Canister as AnonymizationCanister, + CanisterMethodsBuilder as AnonymizationCanisterMethodsBuilder, Track, + Tracker as AnonymizationTracker, }; use anyhow::{anyhow, Context, Error}; use arc_swap::ArcSwapOption; @@ -431,7 +433,10 @@ pub async fn main(cli: Cli) -> Result<(), Error> { // HTTP Logs Anonymization let tracker = if let Some(v) = cli.obs.obs_log_anonymization_canister_id { let canister = AnonymizationCanister::new(agent.clone().unwrap(), v); - Some(AnonymizationTracker::new(Box::new(OsRng), canister.into())?) + let cm = AnonymizationCanisterMethodsBuilder::new(canister) + .with_metrics(&metrics_registry) + .build(); + Some(AnonymizationTracker::new(Box::new(OsRng), cm)?) } else { None };