Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EASY] Add metrics for logging native price requests #3006

Merged
merged 6 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/shared/src/price_estimation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
model::order::{BuyTokenDestination, OrderKind, SellTokenSource},
num::BigRational,
number::nonzero::U256 as NonZeroU256,
prometheus::IntCounterVec,
rate_limit::{RateLimiter, Strategy},
reqwest::Url,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -519,6 +520,27 @@ pub fn amounts_to_price(sell_amount: U256, buy_amount: U256) -> Option<BigRation
))
}

#[derive(prometheus_metric_storage::MetricStorage)]
pub(in crate::price_estimation) struct Metrics {
m-lord-renkse marked this conversation as resolved.
Show resolved Hide resolved
/// number of requests done by each estimator
m-lord-renkse marked this conversation as resolved.
Show resolved Hide resolved
#[metric(labels("requests"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[metric(labels("requests"))]
#[metric(labels("estimator"))]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unifying it with the same naming in InstrumentedPriceEstimator to be consistent.

native_price_requests: IntCounterVec,
}

impl Metrics {
fn get() -> &'static Metrics {
Metrics::instance(observe::metrics::get_storage_registry())
.expect("unexpected error getting metrics instance")
}

pub(in crate::price_estimation) fn inc_estimator(estimator: &str) {
m-lord-renkse marked this conversation as resolved.
Show resolved Hide resolved
Metrics::get()
.native_price_requests
.with_label_values(&[estimator])
.inc_by(1);
}
}

pub const HEALTHY_PRICE_ESTIMATION_TIME: Duration = Duration::from_millis(5_000);

pub async fn rate_limited<T>(
Expand Down
2 changes: 2 additions & 0 deletions crates/shared/src/price_estimation/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use {
crate::price_estimation::{
native::{NativePriceEstimateResult, NativePriceEstimating},
Metrics,
PriceEstimationError,
},
anyhow::anyhow,
Expand Down Expand Up @@ -91,6 +92,7 @@ where
token: H160,
) -> futures::future::BoxFuture<'_, NativePriceEstimateResult> {
async move {
Metrics::inc_estimator("buffered");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would become problematic as soon as there are multiple different APIs that support buffering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are definitely right, let's add it to CoinGecko

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw if you use the InstrumentedPriceEstimator wrapper thingy you don't have to add anything to coingecko. You just wrap it in that struct and it starts instrumenting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's what I am doing, but I am trying to do it as nicer as possible, because InstrumentedPriceEstimator is hard-coded for PriceEstimating

// We must subscribe before we send the request, so we get the `rx` pointing to
// the current memory point, this way we avoid losing the result for
// the corner case in which the request is sent between the `unbounded_send()`
Expand Down
5 changes: 4 additions & 1 deletion crates/shared/src/price_estimation/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
PriceEstimating,
Query,
},
crate::trade_finding::external::ExternalTradeFinder,
crate::{price_estimation::Metrics, trade_finding::external::ExternalTradeFinder},
ethrpc::block_stream::CurrentBlockWatcher,
rate_limit::RateLimiter,
reqwest::{Client, Url},
Expand All @@ -17,6 +17,7 @@ pub struct ExternalPriceEstimator(TradeEstimator);

impl ExternalPriceEstimator {
pub fn new(
name: &str,
driver: Url,
client: Client,
rate_limiter: Arc<RateLimiter>,
Expand All @@ -32,6 +33,7 @@ impl ExternalPriceEstimator {
)),
rate_limiter,
driver.to_string(),
name,
))
}

Expand All @@ -42,6 +44,7 @@ impl ExternalPriceEstimator {

impl PriceEstimating for ExternalPriceEstimator {
fn estimate(&self, query: Arc<Query>) -> futures::future::BoxFuture<'_, PriceEstimateResult> {
Metrics::inc_estimator(self.0.name());
self.0.estimate(query)
}
}
3 changes: 3 additions & 0 deletions crates/shared/src/price_estimation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl<'a> PriceEstimatorFactory<'a> {
fn get_estimator(&mut self, solver: &ExternalSolver) -> Result<&EstimatorEntry> {
let params = ExternalEstimatorParams {
driver: solver.url.clone(),
name: solver.name.clone(),
timeout: self.args.quote_timeout,
};
if !self.estimators.contains_key(&solver.name) {
Expand Down Expand Up @@ -374,6 +375,7 @@ trait PriceEstimatorCreating: Sized {
#[derive(Debug, Clone)]
struct ExternalEstimatorParams {
driver: Url,
name: String,
timeout: std::time::Duration,
}

Expand All @@ -382,6 +384,7 @@ impl PriceEstimatorCreating for ExternalPriceEstimator {

fn init(factory: &PriceEstimatorFactory, name: &str, params: Self::Params) -> Result<Self> {
Ok(Self::new(
params.name.as_str(),
params.driver,
factory.components.http_factory.create(),
factory.rate_limiter(name),
Expand Down
3 changes: 2 additions & 1 deletion crates/shared/src/price_estimation/native/coingecko.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::{NativePriceEstimateResult, NativePriceEstimating},
crate::{
price_estimation::{buffered::NativePriceBatchFetching, PriceEstimationError},
price_estimation::{buffered::NativePriceBatchFetching, Metrics, PriceEstimationError},
token_info::{TokenInfo, TokenInfoFetching},
},
anyhow::{anyhow, Context, Result},
Expand Down Expand Up @@ -233,6 +233,7 @@ impl NativePriceBatchFetching for CoinGecko {
impl NativePriceEstimating for CoinGecko {
fn estimate_native_price(&self, token: Token) -> BoxFuture<'_, NativePriceEstimateResult> {
async move {
Metrics::inc_estimator("CoinGecko");
let prices = self.fetch_native_prices(HashSet::from([token])).await?;
prices
.get(&token)
Expand Down
6 changes: 5 additions & 1 deletion crates/shared/src/price_estimation/native/oneinch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use {
super::{NativePrice, NativePriceEstimateResult, NativePriceEstimating},
crate::{price_estimation::PriceEstimationError, token_info::TokenInfoFetching},
crate::{
price_estimation::{Metrics, PriceEstimationError},
token_info::TokenInfoFetching,
},
anyhow::{anyhow, Context, Result},
ethrpc::block_stream::{into_stream, CurrentBlockWatcher},
futures::{future::BoxFuture, FutureExt, StreamExt},
Expand Down Expand Up @@ -90,6 +93,7 @@ impl OneInch {
impl NativePriceEstimating for OneInch {
fn estimate_native_price(&self, token: Token) -> BoxFuture<'_, NativePriceEstimateResult> {
async move {
Metrics::inc_estimator("OneInch");
let prices = self.prices.lock().unwrap();
prices
.get(&token)
Expand Down
18 changes: 8 additions & 10 deletions crates/shared/src/price_estimation/trade_finder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use {

/// A `TradeFinding`-based price estimator with request sharing and rate
/// limiting.
#[derive(Clone)]
pub struct TradeEstimator {
inner: Arc<Inner>,
sharing: RequestSharing<Arc<Query>, BoxFuture<'static, Result<Estimate, PriceEstimationError>>>,
rate_limiter: Arc<RateLimiter>,
name: String,
}

#[derive(Clone)]
Expand All @@ -41,6 +43,7 @@ impl TradeEstimator {
finder: Arc<dyn TradeFinding>,
rate_limiter: Arc<RateLimiter>,
label: String,
name: &str,
) -> Self {
Self {
inner: Arc::new(Inner {
Expand All @@ -49,9 +52,14 @@ impl TradeEstimator {
}),
sharing: RequestSharing::labelled(format!("estimator_{}", label)),
rate_limiter,
name: name.to_string(),
}
}

pub fn name(&self) -> &str {
self.name.as_str()
}

pub fn with_verifier(mut self, verifier: Arc<dyn TradeVerifying>) -> Self {
self.inner = Arc::new(Inner {
verifier: Some(verifier),
Expand Down Expand Up @@ -104,16 +112,6 @@ impl Inner {
}
}

impl Clone for TradeEstimator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this was done because it required in the pass a specific Clone implementation. But this currently does what the #[derive(Clone)] should do.

fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
sharing: self.sharing.clone(),
rate_limiter: self.rate_limiter.clone(),
}
}
}

impl PriceEstimating for TradeEstimator {
fn estimate(&self, query: Arc<Query>) -> futures::future::BoxFuture<'_, PriceEstimateResult> {
self.estimate(query).boxed()
Expand Down
Loading