Skip to content

Commit

Permalink
feat(BOUN-1300): Add metrics to anonymization client (#3053)
Browse files Browse the repository at this point in the history
This change allows setting a metrics registry when creating the canister
client for log anonymization.
  • Loading branch information
rikonor authored Dec 10, 2024
1 parent 49f6b11 commit 9a2466b
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/boundary_node/anonymization/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ DEPENDENCIES = [
"//rs/types/types",
"@crate_index//:anyhow",
"@crate_index//:candid",
"@crate_index//:prometheus",
"@crate_index//:rand",
"@crate_index//:rsa",
"@crate_index//:thiserror",
Expand Down
1 change: 1 addition & 0 deletions rs/boundary_node/anonymization/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ async-trait = { workspace = true }
candid = { workspace = true }
ic-canister-client = { path = "../../../canister_client" }
ic-types = { path = "../../../types/types" }
prometheus = { workspace = true }
rand = { workspace = true }
rsa = { workspace = true }
thiserror = { workspace = true }
Expand Down
154 changes: 134 additions & 20 deletions rs/boundary_node/anonymization/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use anyhow::{anyhow, Context, Error};
use async_trait::async_trait;
use candid::{Decode, Encode, Principal};
use ic_canister_client::Agent;
use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramOpts,
HistogramVec, IntCounterVec, Registry,
};
use rsa::{
pkcs1::{DecodeRsaPublicKey, EncodeRsaPublicKey},
rand_core::CryptoRngCore,
Expand All @@ -20,7 +24,58 @@ use tokio::{sync::Mutex, time::sleep};
const SALT_SIZE: usize = 64;
const RSA_KEY_SIZE: usize = 2048;

struct WithLogs<T>(T);
#[derive(Clone, Debug)]
pub struct MetricParams {
pub action: String,
pub counter: IntCounterVec,
pub recorder: HistogramVec,
}

impl MetricParams {
pub fn new(registry: &Registry, action: &str) -> Self {
Self::new_with_opts(registry, action, &["status"], None)
}

pub fn new_with_opts(
registry: &Registry,
action: &str,
labels: &[&str],
buckets: Option<&[f64]>,
) -> Self {
let mut recorder_opts = HistogramOpts::new(
format!("{action}_duration_sec"), // name
format!("Records the duration of {action} calls in seconds"), // description
);

// Set histogram buckets if given
buckets.inspect(|bs| {
recorder_opts.buckets = bs.to_vec();
});

Self {
action: action.to_string(),

// Count
counter: register_int_counter_vec_with_registry!(
format!("{action}_total"), // name
format!("Counts occurrences of {action} calls"), // description
labels, // labels
registry, // registry
)
.expect("failed to register counter"),

// Duration
recorder: register_histogram_vec_with_registry!(
recorder_opts, // options
labels, // labels
registry, // registry
)
.expect("failed to register histogram"),
}
}
}

struct WithMetrics<T>(T, Option<MetricParams>);

pub struct ThrottleParams {
pub d: Duration,
Expand Down Expand Up @@ -92,7 +147,7 @@ pub trait Register: Sync + Send {
}

#[async_trait]
impl<T: Register> Register for WithLogs<T> {
impl<T: Register> Register for WithMetrics<T> {
async fn register(&self, pubkey: &[u8]) -> Result<(), RegisterError> {
let start_time = Instant::now();

Expand All @@ -108,11 +163,24 @@ impl<T: Register> Register for WithLogs<T> {

let duration = start_time.elapsed().as_secs_f64();

// Log
println!(
"action = 'register', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);

// Metrics
if let Some(MetricParams {
counter, recorder, ..
}) = &self.1
{
// Count
counter.with_label_values(&[status]).inc();

// Latency
recorder.with_label_values(&[status]).observe(duration);
}

return out;
}
}
Expand Down Expand Up @@ -181,7 +249,7 @@ pub trait Query: Sync + Send {
}

#[async_trait]
impl<T: Query> Query for WithLogs<T> {
impl<T: Query> Query for WithMetrics<T> {
async fn query(&self) -> Result<Vec<u8>, QueryError> {
let start_time = Instant::now();

Expand All @@ -202,11 +270,24 @@ impl<T: Query> Query for WithLogs<T> {

let duration = start_time.elapsed().as_secs_f64();

// Log
println!(
"action = 'query', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);

// Metrics
if let Some(MetricParams {
counter, recorder, ..
}) = &self.1
{
// Count
counter.with_label_values(&[status]).inc();

// Latency
recorder.with_label_values(&[status]).observe(duration);
}

return out;
}
}
Expand All @@ -226,7 +307,7 @@ pub trait Submit: Sync + Send {
}

#[async_trait]
impl<T: Submit> Submit for WithLogs<T> {
impl<T: Submit> Submit for WithMetrics<T> {
async fn submit(&self, vs: &[Pair]) -> Result<(), SubmitError> {
let start_time = Instant::now();

Expand All @@ -242,11 +323,24 @@ impl<T: Submit> Submit for WithLogs<T> {

let duration = start_time.elapsed().as_secs_f64();

// Log
println!(
"action = 'submit', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);

// Metrics
if let Some(MetricParams {
counter, recorder, ..
}) = &self.1
{
// Count
counter.with_label_values(&[status]).inc();

// Latency
recorder.with_label_values(&[status]).observe(duration);
}

return out;
}
}
Expand Down Expand Up @@ -424,23 +518,43 @@ pub struct CanisterMethods {
submit: Arc<dyn Submit>,
}

impl From<Canister> for CanisterMethods {
fn from(value: Canister) -> Self {
pub struct CanisterMethodsBuilder<'a> {
canister: Canister,
registry: Option<&'a Registry>,
}

impl<'a> CanisterMethodsBuilder<'a> {
pub fn new(c: Canister) -> Self {
Self {
register: Arc::new({
let v = value.clone();
let v = WithLogs(v);
WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)))
}),
query: Arc::new({
let v = value.clone();
let v = WithLogs(v);
WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)))
}),
submit: Arc::new({
let v = value.clone();
WithLogs(v)
}),
canister: c,
registry: None,
}
}

pub fn with_metrics(mut self, r: &'a Registry) -> Self {
self.registry = Some(r);
self
}

pub fn build(self) -> CanisterMethods {
CanisterMethods {
register: {
let v = self.canister.clone();
let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "register")));
let v = WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)));
Arc::new(v)
},
query: {
let v = self.canister.clone();
let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "query")));
let v = WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)));
Arc::new(v)
},
submit: {
let v = self.canister.clone();
let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "submit")));
Arc::new(v)
},
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions rs/boundary_node/ic_boundary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::{
};

use anonymization_client::{
Canister as AnonymizationCanister, Track, Tracker as AnonymizationTracker,
Canister as AnonymizationCanister,
CanisterMethodsBuilder as AnonymizationCanisterMethodsBuilder, Track,
Tracker as AnonymizationTracker,
};
use anyhow::{anyhow, Context, Error};
use arc_swap::ArcSwapOption;
Expand Down Expand Up @@ -431,7 +433,10 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
// HTTP Logs Anonymization
let tracker = if let Some(v) = cli.obs.obs_log_anonymization_canister_id {
let canister = AnonymizationCanister::new(agent.clone().unwrap(), v);
Some(AnonymizationTracker::new(Box::new(OsRng), canister.into())?)
let cm = AnonymizationCanisterMethodsBuilder::new(canister)
.with_metrics(&metrics_registry)
.build();
Some(AnonymizationTracker::new(Box::new(OsRng), cm)?)
} else {
None
};
Expand Down

0 comments on commit 9a2466b

Please sign in to comment.