Skip to content

Commit

Permalink
Add metrics for logging native price requests
Browse files Browse the repository at this point in the history
  • Loading branch information
m-lord-renkse committed Sep 19, 2024
1 parent 61ab6aa commit 9311449
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 13 deletions.
21 changes: 21 additions & 0 deletions crates/shared/src/price_estimation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
model::order::{BuyTokenDestination, OrderKind, SellTokenSource},
num::BigRational,
number::nonzero::U256 as NonZeroU256,
prometheus::IntCounterVec,
rate_limit::{RateLimiter, Strategy},
reqwest::Url,
serde::{Deserialize, Serialize},
Expand Down Expand Up @@ -519,6 +520,26 @@ pub fn amounts_to_price(sell_amount: U256, buy_amount: U256) -> Option<BigRation
))
}

#[derive(prometheus_metric_storage::MetricStorage)]
pub(in crate::price_estimation) struct Metrics {
/// number of requests done by each estimator
native_price_requests: IntCounterVec,
}

impl Metrics {
fn get() -> &'static Metrics {
Metrics::instance(observe::metrics::get_storage_registry())
.expect("unexpected error getting metrics instance")
}

pub(in crate::price_estimation) fn inc_estimator(estimator: &str) {
Metrics::get()
.native_price_requests
.with_label_values(&[estimator])
.inc_by(1);
}
}

pub const HEALTHY_PRICE_ESTIMATION_TIME: Duration = Duration::from_millis(5_000);

pub async fn rate_limited<T>(
Expand Down
2 changes: 2 additions & 0 deletions crates/shared/src/price_estimation/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use {
crate::price_estimation::{
native::{NativePriceEstimateResult, NativePriceEstimating},
Metrics,
PriceEstimationError,
},
anyhow::anyhow,
Expand Down Expand Up @@ -91,6 +92,7 @@ where
token: H160,
) -> futures::future::BoxFuture<'_, NativePriceEstimateResult> {
async move {
Metrics::inc_estimator("buffered");
// We must subscribe before we send the request, so we get the `rx` pointing to
// the current memory point, this way we avoid losing the result for
// the corner case in which the request is sent between the `unbounded_send()`
Expand Down
5 changes: 4 additions & 1 deletion crates/shared/src/price_estimation/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
PriceEstimating,
Query,
},
crate::trade_finding::external::ExternalTradeFinder,
crate::{price_estimation::Metrics, trade_finding::external::ExternalTradeFinder},
ethrpc::block_stream::CurrentBlockWatcher,
rate_limit::RateLimiter,
reqwest::{Client, Url},
Expand All @@ -17,6 +17,7 @@ pub struct ExternalPriceEstimator(TradeEstimator);

impl ExternalPriceEstimator {
pub fn new(
name: &str,
driver: Url,
client: Client,
rate_limiter: Arc<RateLimiter>,
Expand All @@ -32,6 +33,7 @@ impl ExternalPriceEstimator {
)),
rate_limiter,
driver.to_string(),
name,
))
}

Expand All @@ -42,6 +44,7 @@ impl ExternalPriceEstimator {

impl PriceEstimating for ExternalPriceEstimator {
fn estimate(&self, query: Arc<Query>) -> futures::future::BoxFuture<'_, PriceEstimateResult> {
Metrics::inc_estimator(self.0.name());
self.0.estimate(query)
}
}
3 changes: 3 additions & 0 deletions crates/shared/src/price_estimation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl<'a> PriceEstimatorFactory<'a> {
fn get_estimator(&mut self, solver: &ExternalSolver) -> Result<&EstimatorEntry> {
let params = ExternalEstimatorParams {
driver: solver.url.clone(),
name: solver.name.clone(),
timeout: self.args.quote_timeout,
};
if !self.estimators.contains_key(&solver.name) {
Expand Down Expand Up @@ -374,6 +375,7 @@ trait PriceEstimatorCreating: Sized {
#[derive(Debug, Clone)]
struct ExternalEstimatorParams {
driver: Url,
name: String,
timeout: std::time::Duration,
}

Expand All @@ -382,6 +384,7 @@ impl PriceEstimatorCreating for ExternalPriceEstimator {

fn init(factory: &PriceEstimatorFactory, name: &str, params: Self::Params) -> Result<Self> {
Ok(Self::new(
params.name.as_str(),
params.driver,
factory.components.http_factory.create(),
factory.rate_limiter(name),
Expand Down
3 changes: 2 additions & 1 deletion crates/shared/src/price_estimation/native/coingecko.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::{NativePriceEstimateResult, NativePriceEstimating},
crate::{
price_estimation::{buffered::NativePriceBatchFetching, PriceEstimationError},
price_estimation::{buffered::NativePriceBatchFetching, Metrics, PriceEstimationError},
token_info::{TokenInfo, TokenInfoFetching},
},
anyhow::{anyhow, Context, Result},
Expand Down Expand Up @@ -233,6 +233,7 @@ impl NativePriceBatchFetching for CoinGecko {
impl NativePriceEstimating for CoinGecko {
fn estimate_native_price(&self, token: Token) -> BoxFuture<'_, NativePriceEstimateResult> {
async move {
Metrics::inc_estimator("CoinGecko");
let prices = self.fetch_native_prices(HashSet::from([token])).await?;
prices
.get(&token)
Expand Down
6 changes: 5 additions & 1 deletion crates/shared/src/price_estimation/native/oneinch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use {
super::{NativePrice, NativePriceEstimateResult, NativePriceEstimating},
crate::{price_estimation::PriceEstimationError, token_info::TokenInfoFetching},
crate::{
price_estimation::{Metrics, PriceEstimationError},
token_info::TokenInfoFetching,
},
anyhow::{anyhow, Context, Result},
ethrpc::block_stream::{into_stream, CurrentBlockWatcher},
futures::{future::BoxFuture, FutureExt, StreamExt},
Expand Down Expand Up @@ -90,6 +93,7 @@ impl OneInch {
impl NativePriceEstimating for OneInch {
fn estimate_native_price(&self, token: Token) -> BoxFuture<'_, NativePriceEstimateResult> {
async move {
Metrics::inc_estimator("OneInch");
let prices = self.prices.lock().unwrap();
prices
.get(&token)
Expand Down
18 changes: 8 additions & 10 deletions crates/shared/src/price_estimation/trade_finder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use {

/// A `TradeFinding`-based price estimator with request sharing and rate
/// limiting.
#[derive(Clone)]
pub struct TradeEstimator {
inner: Arc<Inner>,
sharing: RequestSharing<Arc<Query>, BoxFuture<'static, Result<Estimate, PriceEstimationError>>>,
rate_limiter: Arc<RateLimiter>,
name: String,
}

#[derive(Clone)]
Expand All @@ -41,6 +43,7 @@ impl TradeEstimator {
finder: Arc<dyn TradeFinding>,
rate_limiter: Arc<RateLimiter>,
label: String,
name: &str,
) -> Self {
Self {
inner: Arc::new(Inner {
Expand All @@ -49,9 +52,14 @@ impl TradeEstimator {
}),
sharing: RequestSharing::labelled(format!("estimator_{}", label)),
rate_limiter,
name: name.to_string(),
}
}

pub fn name(&self) -> &str {
self.name.as_str()
}

pub fn with_verifier(mut self, verifier: Arc<dyn TradeVerifying>) -> Self {
self.inner = Arc::new(Inner {
verifier: Some(verifier),
Expand Down Expand Up @@ -104,16 +112,6 @@ impl Inner {
}
}

impl Clone for TradeEstimator {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
sharing: self.sharing.clone(),
rate_limiter: self.rate_limiter.clone(),
}
}
}

impl PriceEstimating for TradeEstimator {
fn estimate(&self, query: Arc<Query>) -> futures::future::BoxFuture<'_, PriceEstimateResult> {
self.estimate(query).boxed()
Expand Down

0 comments on commit 9311449

Please sign in to comment.