diff --git a/crates/driver/src/boundary/mod.rs b/crates/driver/src/boundary/mod.rs index 0e7cd2f395..c6aacc1f74 100644 --- a/crates/driver/src/boundary/mod.rs +++ b/crates/driver/src/boundary/mod.rs @@ -45,24 +45,26 @@ fn web3(eth: &Ethereum) -> Web3 { /// Builds a web3 client that buffers requests and sends them in a /// batch call. -pub fn buffered_web3_client(ethrpc: &Url) -> Web3 { - web3_client(ethrpc, 20, 10) +pub fn buffered_web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web3 { + web3_client(ethrpc, ethrpc_args) } /// Builds a web3 client that sends requests one by one. pub fn unbuffered_web3_client(ethrpc: &Url) -> Web3 { - web3_client(ethrpc, 0, 0) + web3_client( + ethrpc, + &shared::ethrpc::Arguments { + ethrpc_max_batch_size: 0, + ethrpc_max_concurrent_requests: 0, + ethrpc_batch_delay: Default::default(), + }, + ) } -fn web3_client(ethrpc: &Url, max_batch_size: usize, max_concurrent_requests: usize) -> Web3 { - let ethrpc_args = shared::ethrpc::Arguments { - ethrpc_max_batch_size: max_batch_size, - ethrpc_max_concurrent_requests: max_concurrent_requests, - ethrpc_batch_delay: Default::default(), - }; +fn web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web3 { let http_factory = shared::http_client::HttpClientFactory::new(&shared::http_client::Arguments { http_timeout: std::time::Duration::from_secs(10), }); - shared::ethrpc::web3(ðrpc_args, &http_factory, ethrpc, "base") + shared::ethrpc::web3(ethrpc_args, &http_factory, ethrpc, "base") } diff --git a/crates/driver/src/infra/blockchain/contracts.rs b/crates/driver/src/infra/blockchain/contracts.rs index 835ff66130..664793a7ed 100644 --- a/crates/driver/src/infra/blockchain/contracts.rs +++ b/crates/driver/src/infra/blockchain/contracts.rs @@ -66,9 +66,9 @@ impl Contracts { .0, ); - let archive_node_web3 = archive_node_url - .as_ref() - .map_or(web3.clone(), |url| boundary::buffered_web3_client(url)); + let archive_node_web3 = archive_node_url.as_ref().map_or(web3.clone(), |url| { + boundary::buffered_web3_client(url, &shared::ethrpc::Arguments::default()) + }); // todo: provide it from the config let mut cow_amm_registry = cow_amm::Registry::new(archive_node_web3); for config in addresses.cow_amms { cow_amm_registry diff --git a/crates/driver/src/infra/blockchain/mod.rs b/crates/driver/src/infra/blockchain/mod.rs index 9115d203a0..76fa77deac 100644 --- a/crates/driver/src/infra/blockchain/mod.rs +++ b/crates/driver/src/infra/blockchain/mod.rs @@ -27,8 +27,8 @@ pub struct Rpc { impl Rpc { /// Instantiate an RPC client to an Ethereum (or Ethereum-compatible) node /// at the specifed URL. - pub async fn new(url: &url::Url) -> Result { - let web3 = boundary::buffered_web3_client(url); + pub async fn new(url: &url::Url, args: &shared::ethrpc::Arguments) -> Result { + let web3 = boundary::buffered_web3_client(url, args); let chain = web3.eth().chain_id().await?.into(); Ok(Self { diff --git a/crates/driver/src/infra/cli.rs b/crates/driver/src/infra/cli.rs index 9159f990f7..f4c7dacd7b 100644 --- a/crates/driver/src/infra/cli.rs +++ b/crates/driver/src/infra/cli.rs @@ -21,6 +21,9 @@ pub struct Args { #[clap(long, env)] pub ethrpc: Url, + #[clap(flatten)] + pub ethrpc_args: shared::ethrpc::Arguments, + /// Path to the driver configuration file. This file should be in TOML /// format. For an example see /// https://github.com/cowprotocol/services/blob/main/crates/driver/example.toml. diff --git a/crates/driver/src/run.rs b/crates/driver/src/run.rs index 10e431208d..6eefa9c92c 100644 --- a/crates/driver/src/run.rs +++ b/crates/driver/src/run.rs @@ -125,7 +125,7 @@ fn simulator(config: &infra::Config, eth: &Ethereum) -> Simulator { } async fn ethrpc(args: &cli::Args) -> blockchain::Rpc { - blockchain::Rpc::new(&args.ethrpc) + blockchain::Rpc::new(&args.ethrpc, &args.ethrpc_args) .await .expect("connect ethereum RPC") } diff --git a/crates/driver/src/tests/cases/settle.rs b/crates/driver/src/tests/cases/settle.rs index 110d138734..c82b0bac03 100644 --- a/crates/driver/src/tests/cases/settle.rs +++ b/crates/driver/src/tests/cases/settle.rs @@ -7,6 +7,7 @@ use { setup::{ab_order, ab_pool, ab_solution}, }, }, + futures::future::join_all, web3::Transport, }; @@ -110,3 +111,50 @@ async fn high_gas_limit() { .unwrap(); test.settle(&id).await.ok().await; } + +#[tokio::test] +#[ignore] +async fn too_many_settle_calls() { + let test = tests::setup() + .allow_multiple_solve_requests() + .pool(ab_pool()) + .order(ab_order()) + .solution(ab_solution()) + .ethrpc_args(shared::ethrpc::Arguments { + ethrpc_max_batch_size: 10, + ethrpc_max_concurrent_requests: 10, + ethrpc_batch_delay: std::time::Duration::from_secs(1), + }) + .solve_deadline_timeout(chrono::Duration::seconds(4)) + .done() + .await; + + let id1 = test.solve().await.ok().id(); + let id2 = test.solve().await.ok().id(); + let id3 = test.solve().await.ok().id(); + let id4 = test.solve().await.ok().id(); + + assert_ne!(id1, id2); + assert_ne!(id2, id3); + assert_ne!(id1, id3); + assert_ne!(id1, id4); + + let results = join_all(vec![ + test.settle(&id1), + test.settle(&id2), + test.settle(&id3), + test.settle(&id4), + ]) + .await; + + for (index, result) in results.into_iter().enumerate() { + match index { + 0 => { + result.ok().await.ab_order_executed().await; + } + 1 | 2 => result.err().kind("FailedToSubmit"), + 3 => result.err().kind("QueueAwaitingDeadlineExceeded"), + _ => unreachable!(), + } + } +} diff --git a/crates/driver/src/tests/setup/driver.rs b/crates/driver/src/tests/setup/driver.rs index 6cf08b1cc6..2c2d288416 100644 --- a/crates/driver/src/tests/setup/driver.rs +++ b/crates/driver/src/tests/setup/driver.rs @@ -18,6 +18,7 @@ pub struct Config { pub enable_simulation: bool, pub mempools: Vec, pub order_priority_strategies: Vec, + pub ethrpc_args: Option, } pub struct Driver { @@ -40,7 +41,8 @@ impl Driver { } }; let (addr_sender, addr_receiver) = oneshot::channel(); - let args = vec![ + + let mut args = vec![ "/test/driver/path".to_owned(), "--addr".to_owned(), "0.0.0.0:0".to_owned(), @@ -49,6 +51,14 @@ impl Driver { "--config".to_owned(), config_file.to_str().unwrap().to_owned(), ]; + if let Some(ethrpc_arg) = &config.ethrpc_args { + args.push("--ethrpc-max-batch-size".to_owned()); + args.push(ethrpc_arg.ethrpc_max_batch_size.to_string()); + args.push("--ethrpc-max-concurrent-requests".to_owned()); + args.push(ethrpc_arg.ethrpc_max_concurrent_requests.to_string()); + args.push("--ethrpc-batch-delay".to_owned()); + args.push(ethrpc_arg.ethrpc_batch_delay.as_millis().to_string() + "ms"); + } tokio::spawn(crate::run(args.into_iter(), Some(addr_sender))); let addr = addr_receiver.await.unwrap(); Self { diff --git a/crates/driver/src/tests/setup/mod.rs b/crates/driver/src/tests/setup/mod.rs index 63458971b0..8b7e3c1442 100644 --- a/crates/driver/src/tests/setup/mod.rs +++ b/crates/driver/src/tests/setup/mod.rs @@ -499,6 +499,7 @@ pub fn setup() -> Setup { rpc_args: vec!["--gas-limit".into(), "10000000".into()], allow_multiple_solve_requests: false, auction_id: 1, + solve_deadline_timeout: chrono::Duration::seconds(2), ..Default::default() } } @@ -532,6 +533,9 @@ pub struct Setup { allow_multiple_solve_requests: bool, /// Auction ID used during tests auction_id: i64, + ethrpc_args: Option, + /// Auction solving deadline timeout + solve_deadline_timeout: chrono::Duration, } /// The validity of a solution. @@ -842,6 +846,11 @@ impl Setup { self } + pub fn ethrpc_args(mut self, ethrpc_args: shared::ethrpc::Arguments) -> Self { + self.ethrpc_args = Some(ethrpc_args); + self + } + /// Create the test: set up onchain contracts and pools, start a mock HTTP /// server for the solver and start the HTTP server for the driver. pub async fn done(self) -> Test { @@ -945,6 +954,7 @@ impl Setup { enable_simulation: self.enable_simulation, mempools: self.mempools, order_priority_strategies: self.order_priority_strategies, + ethrpc_args: self.ethrpc_args, }, &solvers_with_address, &blockchain, @@ -981,7 +991,12 @@ impl Setup { } fn deadline(&self) -> chrono::DateTime { - crate::infra::time::now() + chrono::Duration::seconds(2) + crate::infra::time::now() + self.solve_deadline_timeout + } + + pub fn solve_deadline_timeout(mut self, timeout: chrono::Duration) -> Self { + self.solve_deadline_timeout = timeout; + self } pub fn allow_multiple_solve_requests(mut self) -> Self { diff --git a/crates/driver/src/tests/setup/solver.rs b/crates/driver/src/tests/setup/solver.rs index 777e6416d2..9194ab8a58 100644 --- a/crates/driver/src/tests/setup/solver.rs +++ b/crates/driver/src/tests/setup/solver.rs @@ -381,7 +381,9 @@ impl Solver { .collect::>(); let url = config.blockchain.web3_url.parse().unwrap(); - let rpc = infra::blockchain::Rpc::new(&url).await.unwrap(); + let rpc = infra::blockchain::Rpc::new(&url, &shared::ethrpc::Arguments::default()) + .await + .unwrap(); let gas = Arc::new( infra::blockchain::GasPriceEstimator::new( rpc.web3(), diff --git a/crates/shared/src/ethrpc.rs b/crates/shared/src/ethrpc.rs index 8f2a0ab9c0..0e3b1d00eb 100644 --- a/crates/shared/src/ethrpc.rs +++ b/crates/shared/src/ethrpc.rs @@ -7,6 +7,7 @@ pub use ethrpc::{ }; use { crate::http_client::HttpClientFactory, + clap::Parser, reqwest::Url, std::{ fmt::{self, Display, Formatter}, @@ -17,12 +18,12 @@ use { pub const MAX_BATCH_SIZE: usize = 100; /// Command line arguments for the common Ethereum RPC connections. -#[derive(clap::Parser)] +#[derive(clap::Parser, Debug)] #[group(skip)] pub struct Arguments { /// Maximum batch size for Ethereum RPC requests. Use '0' to disable /// batching. - #[clap(long, env, default_value = "100")] + #[clap(long, env, default_value = "20")] pub ethrpc_max_batch_size: usize, /// Maximum number of concurrent requests to send to the node. Use '0' for @@ -36,6 +37,12 @@ pub struct Arguments { pub ethrpc_batch_delay: Duration, } +impl Default for Arguments { + fn default() -> Self { + Arguments::parse_from([""]) + } +} + impl Display for Arguments { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let Self {