Skip to content

Commit

Permalink
feat: Esplora request rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Jun 28, 2024
1 parent 462cf73 commit c1fa08a
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 21 deletions.
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ ESPLORA_ENDPOINT=https://blockstream.info/liquid/api

# Poll interval for new blocks in seconds
ESPLORA_POLL_INTERVAL=10

# Max reqs/second for the Esplora endpoint; useful when hitting rate limits
# Set to 0 to disable
ESPLORA_MAX_REQUESTS_PER_SECOND=5
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ r2d2 = "0.8.10"
rayon = "1.10.0"
num_cpus = "1.16.0"
async-trait = "0.1.80"
ratelimit = "0.9.1"

[patch.crates-io]
secp256k1-zkp = { git = "https://github.com/BlockstreamResearch/rust-secp256k1-zkp.git", rev = "60e631c24588a0c9e271badd61959294848c665d" }
Expand Down
82 changes: 67 additions & 15 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use crossbeam_channel::{Receiver, Sender};
use elements::{Block, Transaction};
use log::{error, info, warn};
use log::{error, info, trace, warn};
use ratelimit::Ratelimiter;
use reqwest::Response;
use serde::de::DeserializeOwned;
use tokio::{task, time};

use crate::chain::types::{ChainBackend, NetworkInfo};

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct EsploraClient {
endpoint: String,
poll_interval: u64,

rate_limit: Option<Arc<Ratelimiter>>,

// To keep the channel alive
// This is just a stub; streaming transactions is not supported with Esplora
#[allow(dead_code)]
Expand All @@ -27,7 +31,11 @@ pub struct EsploraClient {
}

impl EsploraClient {
pub fn new(endpoint: String, poll_interval: u64) -> Self {
pub fn new(
endpoint: String,
poll_interval: u64,
max_reqs_per_second: u64,
) -> Result<Self, Box<dyn Error>> {
let trimmed_endpoint = match endpoint.strip_suffix("/") {
Some(s) => s.to_string(),
None => endpoint,
Expand All @@ -36,15 +44,32 @@ impl EsploraClient {
let (tx_sender, tx_receiver) = crossbeam_channel::bounded::<Transaction>(1);
let (block_sender, block_receiver) = crossbeam_channel::unbounded::<Block>();

EsploraClient {
let rate_limit: Option<Arc<Ratelimiter>>;

if max_reqs_per_second > 0 {
info!(
"Rate limiting requests at {} requests/second",
max_reqs_per_second
);
rate_limit = Some(Arc::new(
Ratelimiter::builder(max_reqs_per_second, Duration::from_secs(1))
.max_tokens(max_reqs_per_second)
.build()?,
));
} else {
info!("Not rate limiting");
rate_limit = None;
}

Ok(EsploraClient {
tx_sender,
rate_limit,
tx_receiver,
block_sender,
block_receiver,

poll_interval,
block_receiver,
endpoint: trimmed_endpoint,
}
})
}

pub fn connect(&self) {
Expand Down Expand Up @@ -91,6 +116,11 @@ impl EsploraClient {
continue;
}
};
trace!(
"Got block {} ({})",
block.header.height,
block.header.block_hash()
);
match clone.block_sender.send(block) {
Ok(_) => {}
Err(err) => {
Expand Down Expand Up @@ -163,12 +193,30 @@ impl EsploraClient {
req = req.body(body.unwrap())
}

self.wait_rate_limit();
let res = req.send().await?;
match res.error_for_status() {
Ok(res) => Ok(res),
Err(err) => Err(err),
}
}

fn wait_rate_limit(&self) {
if self.rate_limit.is_none() {
return;
}

loop {
match self.rate_limit.clone().unwrap().try_wait() {
Ok(_) => {
break;
}
Err(time) => {
std::thread::sleep(time);
}
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -228,34 +276,38 @@ mod esplora_client_test {
#[test]
fn test_trim_suffix() {
assert_eq!(
EsploraClient::new(ENDPOINT.to_string(), 0).endpoint,
EsploraClient::new(ENDPOINT.to_string(), 0, 0)
.unwrap()
.endpoint,
"https://blockstream.info/liquid/api"
);
assert_eq!(
EsploraClient::new("https://blockstream.info/liquid/api".to_string(), 0).endpoint,
EsploraClient::new("https://blockstream.info/liquid/api".to_string(), 0, 0)
.unwrap()
.endpoint,
"https://blockstream.info/liquid/api"
);
}

#[tokio::test]
async fn test_new() {
let client = EsploraClient::new(ENDPOINT.to_string(), 0);
let client = EsploraClient::new(ENDPOINT.to_string(), 0, 0).unwrap();

let info = client.get_network_info().await.unwrap();
assert_eq!(info.subversion, "Esplora");
}

#[tokio::test]
async fn test_get_block_count() {
let client = EsploraClient::new(ENDPOINT.to_string(), 0);
let client = EsploraClient::new(ENDPOINT.to_string(), 0, 0).unwrap();

let count = client.get_block_count().await.unwrap();
assert!(count > 2920403);
}

#[tokio::test]
async fn test_get_block_hash() {
let client = EsploraClient::new(ENDPOINT.to_string(), 0);
let client = EsploraClient::new(ENDPOINT.to_string(), 0, 0).unwrap();

let hash = client.get_block_hash(2920407).await.unwrap();
assert_eq!(
Expand All @@ -266,7 +318,7 @@ mod esplora_client_test {

#[tokio::test]
async fn test_get_block() {
let client = EsploraClient::new(ENDPOINT.to_string(), 0);
let client = EsploraClient::new(ENDPOINT.to_string(), 0, 0).unwrap();

let block_hash = "5510868513d80a64371cdedfef49327dc2cd452b32b93cbcd70ddeddcc7bef66";
let block = client.get_block(block_hash.to_string()).await.unwrap();
Expand All @@ -276,7 +328,7 @@ mod esplora_client_test {

#[tokio::test]
async fn test_get_transaction() {
let client = EsploraClient::new(ENDPOINT.to_string(), 0);
let client = EsploraClient::new(ENDPOINT.to_string(), 0, 0).unwrap();

let tx_hash = "dc2505641c10af5fe0ffd8f1bfc14e9608e73137009c69b6ee0d1fe8ce9784d6";
let block = client.get_transaction(tx_hash.to_string()).await.unwrap();
Expand All @@ -285,7 +337,7 @@ mod esplora_client_test {

#[tokio::test]
async fn test_get_transaction_not_found() {
let client = EsploraClient::new(ENDPOINT.to_string(), 0);
let client = EsploraClient::new(ENDPOINT.to_string(), 0, 0).unwrap();

let tx_hash = "not found";
let block = client.get_transaction(tx_hash.to_string()).await;
Expand Down
23 changes: 17 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::env;
use std::sync::Arc;

use crate::chain::esplora::EsploraClient;
use crate::chain::types::ChainBackend;
use dotenvy::dotenv;
use elements::AddressParams;
use log::{debug, error, info};

use crate::chain::types::ChainBackend;

mod api;
mod chain;
mod claimer;
Expand Down Expand Up @@ -110,15 +110,26 @@ async fn get_chain_backend() -> Arc<Box<dyn ChainBackend + Send + Sync>> {
}
}
"esplora" => {
let client = chain::esplora::EsploraClient::new(
match EsploraClient::new(
env::var("ESPLORA_ENDPOINT").expect("ESPLORA_ENDPOINT must be set"),
env::var("ESPLORA_POLL_INTERVAL")
.expect("ESPLORA_POLL_INTERVAL must be set")
.parse::<u64>()
.expect("ESPLORA_POLL_INTERVAL invalid"),
);
client.connect();
Box::new(client)
env::var("ESPLORA_MAX_REQUESTS_PER_SECOND")
.expect("ESPLORA_MAX_REQUESTS_PER_SECOND must be set")
.parse::<u64>()
.expect("ESPLORA_MAX_REQUESTS_PER_SECOND invalid"),
) {
Ok(client) => {
client.connect();
Box::new(client)
}
Err(err) => {
error!("Could not create Esplora client: {}", err);
std::process::exit(1);
}
}
}
&_ => {
error!("Unknown chain backend: {}", backend);
Expand Down

0 comments on commit c1fa08a

Please sign in to comment.