Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BOUN-1300): Add metrics to anonymization client #3053

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/boundary_node/anonymization/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ DEPENDENCIES = [
"//rs/types/types",
"@crate_index//:anyhow",
"@crate_index//:candid",
"@crate_index//:prometheus",
"@crate_index//:rand",
"@crate_index//:rsa",
"@crate_index//:thiserror",
Expand Down
1 change: 1 addition & 0 deletions rs/boundary_node/anonymization/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ async-trait = { workspace = true }
candid = { workspace = true }
ic-canister-client = { path = "../../../canister_client" }
ic-types = { path = "../../../types/types" }
prometheus = { workspace = true }
rand = { workspace = true }
rsa = { workspace = true }
thiserror = { workspace = true }
Expand Down
154 changes: 134 additions & 20 deletions rs/boundary_node/anonymization/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use anyhow::{anyhow, Context, Error};
use async_trait::async_trait;
use candid::{Decode, Encode, Principal};
use ic_canister_client::Agent;
use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry, HistogramOpts,
HistogramVec, IntCounterVec, Registry,
};
use rsa::{
pkcs1::{DecodeRsaPublicKey, EncodeRsaPublicKey},
rand_core::CryptoRngCore,
Expand All @@ -20,7 +24,58 @@ use tokio::{sync::Mutex, time::sleep};
const SALT_SIZE: usize = 64;
const RSA_KEY_SIZE: usize = 2048;

struct WithLogs<T>(T);
#[derive(Clone, Debug)]
pub struct MetricParams {
pub action: String,
pub counter: IntCounterVec,
pub recorder: HistogramVec,
}

impl MetricParams {
pub fn new(registry: &Registry, action: &str) -> Self {
Self::new_with_opts(registry, action, &["status"], None)
}

pub fn new_with_opts(
registry: &Registry,
action: &str,
labels: &[&str],
buckets: Option<&[f64]>,
) -> Self {
let mut recorder_opts = HistogramOpts::new(
format!("{action}_duration_sec"), // name
format!("Records the duration of {action} calls in seconds"), // description
);

// Set histogram buckets if given
buckets.inspect(|bs| {
recorder_opts.buckets = bs.to_vec();
});

Self {
action: action.to_string(),

// Count
counter: register_int_counter_vec_with_registry!(
format!("{action}_total"), // name
format!("Counts occurrences of {action} calls"), // description
labels, // labels
registry, // registry
)
.expect("failed to register counter"),

// Duration
recorder: register_histogram_vec_with_registry!(
recorder_opts, // options
labels, // labels
registry, // registry
)
.expect("failed to register histogram"),
}
}
}

struct WithMetrics<T>(T, Option<MetricParams>);

pub struct ThrottleParams {
pub d: Duration,
Expand Down Expand Up @@ -92,7 +147,7 @@ pub trait Register: Sync + Send {
}

#[async_trait]
impl<T: Register> Register for WithLogs<T> {
impl<T: Register> Register for WithMetrics<T> {
async fn register(&self, pubkey: &[u8]) -> Result<(), RegisterError> {
let start_time = Instant::now();

Expand All @@ -108,11 +163,24 @@ impl<T: Register> Register for WithLogs<T> {

let duration = start_time.elapsed().as_secs_f64();

// Log
println!(
"action = 'register', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);

// Metrics
if let Some(MetricParams {
counter, recorder, ..
}) = &self.1
{
// Count
counter.with_label_values(&[status]).inc();

// Latency
recorder.with_label_values(&[status]).observe(duration);
}

return out;
}
}
Expand Down Expand Up @@ -181,7 +249,7 @@ pub trait Query: Sync + Send {
}

#[async_trait]
impl<T: Query> Query for WithLogs<T> {
impl<T: Query> Query for WithMetrics<T> {
async fn query(&self) -> Result<Vec<u8>, QueryError> {
let start_time = Instant::now();

Expand All @@ -202,11 +270,24 @@ impl<T: Query> Query for WithLogs<T> {

let duration = start_time.elapsed().as_secs_f64();

// Log
println!(
"action = 'query', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);

// Metrics
if let Some(MetricParams {
counter, recorder, ..
}) = &self.1
{
// Count
counter.with_label_values(&[status]).inc();

// Latency
recorder.with_label_values(&[status]).observe(duration);
}

return out;
}
}
Expand All @@ -226,7 +307,7 @@ pub trait Submit: Sync + Send {
}

#[async_trait]
impl<T: Submit> Submit for WithLogs<T> {
impl<T: Submit> Submit for WithMetrics<T> {
async fn submit(&self, vs: &[Pair]) -> Result<(), SubmitError> {
let start_time = Instant::now();

Expand All @@ -242,11 +323,24 @@ impl<T: Submit> Submit for WithLogs<T> {

let duration = start_time.elapsed().as_secs_f64();

// Log
println!(
"action = 'submit', status = {status}, duration = {duration}, error = {:?}",
out.as_ref().err()
);

// Metrics
if let Some(MetricParams {
counter, recorder, ..
}) = &self.1
{
// Count
counter.with_label_values(&[status]).inc();

// Latency
recorder.with_label_values(&[status]).observe(duration);
}

return out;
}
}
Expand Down Expand Up @@ -424,23 +518,43 @@ pub struct CanisterMethods {
submit: Arc<dyn Submit>,
}

impl From<Canister> for CanisterMethods {
fn from(value: Canister) -> Self {
pub struct CanisterMethodsBuilder<'a> {
canister: Canister,
registry: Option<&'a Registry>,
}

impl<'a> CanisterMethodsBuilder<'a> {
pub fn new(c: Canister) -> Self {
Self {
register: Arc::new({
let v = value.clone();
let v = WithLogs(v);
WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)))
}),
query: Arc::new({
let v = value.clone();
let v = WithLogs(v);
WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)))
}),
submit: Arc::new({
let v = value.clone();
WithLogs(v)
}),
canister: c,
registry: None,
}
}

pub fn with_metrics(mut self, r: &'a Registry) -> Self {
self.registry = Some(r);
self
}

pub fn build(self) -> CanisterMethods {
CanisterMethods {
register: {
let v = self.canister.clone();
let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "register")));
let v = WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)));
Arc::new(v)
},
query: {
let v = self.canister.clone();
let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "query")));
let v = WithThrottle(v, ThrottleParams::new(Duration::from_secs(10)));
Arc::new(v)
},
submit: {
let v = self.canister.clone();
let v = WithMetrics(v, self.registry.map(|r| MetricParams::new(r, "submit")));
Arc::new(v)
},
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions rs/boundary_node/ic_boundary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::{
};

use anonymization_client::{
Canister as AnonymizationCanister, Track, Tracker as AnonymizationTracker,
Canister as AnonymizationCanister,
CanisterMethodsBuilder as AnonymizationCanisterMethodsBuilder, Track,
Tracker as AnonymizationTracker,
};
use anyhow::{anyhow, Context, Error};
use arc_swap::ArcSwapOption;
Expand Down Expand Up @@ -431,7 +433,10 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
// HTTP Logs Anonymization
let tracker = if let Some(v) = cli.obs.obs_log_anonymization_canister_id {
let canister = AnonymizationCanister::new(agent.clone().unwrap(), v);
Some(AnonymizationTracker::new(Box::new(OsRng), canister.into())?)
let cm = AnonymizationCanisterMethodsBuilder::new(canister)
.with_metrics(&metrics_registry)
.build();
Some(AnonymizationTracker::new(Box::new(OsRng), cm)?)
} else {
None
};
Expand Down
Loading