Skip to content

Commit

Permalink
Remove unsynchronised run loop (#3050)
Browse files Browse the repository at this point in the history
# Description
Since now all the e2e tests are working in sync-to-blockchain mode (see
#3040) , we can remove the
unsynchronized mode.

# Changes
- Remove unsynchronized mode

## How to test
1. Regression tests
  • Loading branch information
m-lord-renkse authored Oct 11, 2024
1 parent 068bb9f commit 74eed7b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 147 deletions.
22 changes: 0 additions & 22 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ pub struct Arguments {
#[clap(long, env, default_value = "0")]
pub limit_order_price_factor: f64,

/// The time between auction updates.
#[clap(long, env, default_value = "10s", value_parser = humantime::parse_duration)]
pub auction_update_interval: Duration,

/// The URL of a list of tokens our settlement contract is willing to
/// internalize.
#[clap(long, env)]
Expand Down Expand Up @@ -218,10 +214,6 @@ pub struct Arguments {
#[clap(long, env, use_value_delimiter = true)]
pub cow_amm_configs: Vec<CowAmmConfig>,

/// Controls start of the run loop.
#[clap(long, env, default_value = "unsynchronized")]
pub run_loop_mode: RunLoopMode,

/// If a new run loop would start more than this amount of time after the
/// system noticed the latest block, wait for the next block to appear
/// before continuing the run loop.
Expand Down Expand Up @@ -279,11 +271,9 @@ impl std::fmt::Display for Arguments {
db_url,
insert_batch_size,
native_price_estimation_results_required,
auction_update_interval,
max_settlement_transaction_wait,
s3,
cow_amm_configs,
run_loop_mode,
max_run_loop_delay,
run_loop_native_price_timeout,
max_winners_per_auction,
Expand Down Expand Up @@ -352,15 +342,13 @@ impl std::fmt::Display for Arguments {
"native_price_estimation_results_required: {}",
native_price_estimation_results_required
)?;
writeln!(f, "auction_update_interval: {:?}", auction_update_interval)?;
writeln!(
f,
"max_settlement_transaction_wait: {:?}",
max_settlement_transaction_wait
)?;
writeln!(f, "s3: {:?}", s3)?;
writeln!(f, "cow_amm_configs: {:?}", cow_amm_configs)?;
writeln!(f, "run_loop_mode: {:?}", run_loop_mode)?;
writeln!(f, "max_run_loop_delay: {:?}", max_run_loop_delay)?;
writeln!(
f,
Expand Down Expand Up @@ -529,16 +517,6 @@ impl FromStr for CowAmmConfig {
}
}

/// Controls the timing of the run loop.
#[derive(clap::Parser, clap::ValueEnum, Clone, Debug, Default, Copy)]
pub enum RunLoopMode {
/// The run loop starts with the next mined block.
SyncToBlockchain,
/// The run loop starts whenever the previous loop ends.
#[default]
Unsynchronized,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
65 changes: 3 additions & 62 deletions crates/autopilot/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{
arguments::RunLoopMode,
boundary::events::settlement::{GPv2SettlementContract, Indexer},
database::{
onchain_order_events::{
Expand All @@ -11,26 +10,22 @@ use {
Postgres,
},
event_updater::EventUpdater,
solvable_orders::SolvableOrdersCache,
},
anyhow::Result,
ethrpc::block_stream::{into_stream, BlockInfo, CurrentBlockWatcher},
futures::StreamExt,
ethrpc::block_stream::BlockInfo,
prometheus::{
core::{AtomicU64, GenericGauge},
HistogramVec,
IntCounterVec,
},
shared::maintenance::Maintaining,
std::{future::Future, sync::Arc, time::Duration},
tokio::{sync::Mutex, time::timeout},
std::{future::Future, sync::Arc},
tokio::sync::Mutex,
};

/// Coordinates all the updates that need to run a new block
/// to ensure a consistent view of the system.
pub struct Maintenance {
/// Set of orders that make up the current auction.
orders_cache: Arc<SolvableOrdersCache>,
/// Indexes and persists all events emited by the settlement contract.
settlement_indexer: EventUpdater<Indexer, GPv2SettlementContract>,
/// Indexes ethflow orders (orders selling native ETH).
Expand All @@ -46,12 +41,10 @@ pub struct Maintenance {

impl Maintenance {
pub fn new(
orders_cache: Arc<SolvableOrdersCache>,
settlement_indexer: EventUpdater<Indexer, GPv2SettlementContract>,
db_cleanup: Postgres,
) -> Self {
Self {
orders_cache,
settlement_indexer,
db_cleanup,
cow_amm_indexer: Default::default(),
Expand Down Expand Up @@ -126,58 +119,6 @@ impl Maintenance {
Ok(())
}

/// Spawns a background task that runs on every new block but also
/// at least after every `update_interval`.
pub fn spawn_background_task(
self_: Arc<Self>,
run_loop_mode: RunLoopMode,
current_block: CurrentBlockWatcher,
update_interval: Duration,
) {
tokio::task::spawn(async move {
match run_loop_mode {
RunLoopMode::SyncToBlockchain => {
// Update last seen block metric only since everything else will be updated
// inside the runloop.
let mut stream = into_stream(current_block);
loop {
let next_update = timeout(update_interval, stream.next());
match next_update.await {
Ok(Some(block)) => {
metrics().last_seen_block.set(block.number);
}
Ok(None) => break,
Err(_timeout) => {}
};
}
}
RunLoopMode::Unsynchronized => {
let mut latest_block = *current_block.borrow();
let mut stream = into_stream(current_block);
loop {
let next_update = timeout(update_interval, stream.next());
let current_block = match next_update.await {
Ok(Some(block)) => {
metrics().last_seen_block.set(block.number);
block
}
Ok(None) => break,
Err(_timeout) => latest_block,
};
if let Err(err) = self_.update_inner().await {
tracing::warn!(?err, "failed to run background task successfully");
}
if let Err(err) = self_.orders_cache.update(current_block.number).await {
tracing::warn!(?err, "failed to update auction successfully");
}
latest_block = current_block;
}
panic!("block stream terminated unexpectedly");
}
}
});
}

/// Runs the future and collects runtime metrics.
async fn timed_future<T>(label: &str, fut: impl Future<Output = T>) -> T {
let _timer = metrics()
Expand Down
10 changes: 2 additions & 8 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,7 @@ pub async fn run(args: Arguments) {
let trusted_tokens =
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;

let mut maintenance = Maintenance::new(
solvable_orders_cache.clone(),
settlement_event_indexer,
db.clone(),
);
let mut maintenance = Maintenance::new(settlement_event_indexer, db.clone());
maintenance.with_cow_amms(&cow_amm_registry);

if let Some(ethflow_contract) = args.ethflow_contract {
Expand Down Expand Up @@ -530,7 +526,6 @@ pub async fn run(args: Arguments) {
submission_deadline: args.submission_deadline as u64,
max_settlement_transaction_wait: args.max_settlement_transaction_wait,
solve_deadline: args.solve_deadline,
synchronization: args.run_loop_mode,
max_run_loop_delay: args.max_run_loop_delay,
max_winners_per_auction: args.max_winners_per_auction,
};
Expand All @@ -554,7 +549,7 @@ pub async fn run(args: Arguments) {
liveness.clone(),
Arc::new(maintenance),
);
run.run_forever(args.auction_update_interval).await;
run.run_forever().await;
unreachable!("run loop exited");
}

Expand Down Expand Up @@ -625,7 +620,6 @@ async fn shadow_mode(args: Arguments) -> ! {
trusted_tokens,
args.solve_deadline,
liveness.clone(),
args.run_loop_mode,
current_block,
args.max_winners_per_auction,
);
Expand Down
74 changes: 29 additions & 45 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{
arguments::RunLoopMode,
database::competition::Competition,
domain::{
self,
Expand Down Expand Up @@ -47,7 +46,6 @@ pub struct Config {
pub submission_deadline: u64,
pub max_settlement_transaction_wait: Duration,
pub solve_deadline: Duration,
pub synchronization: RunLoopMode,
/// How much time past observing the current block the runloop is
/// allowed to start before it has to re-synchronize to the blockchain
/// by waiting for the next block to appear.
Expand Down Expand Up @@ -105,14 +103,7 @@ impl RunLoop {
}
}

pub async fn run_forever(self, update_interval: Duration) -> ! {
Maintenance::spawn_background_task(
self.maintenance.clone(),
self.config.synchronization,
self.eth.current_block().clone(),
update_interval,
);

pub async fn run_forever(self) -> ! {
let mut last_auction = None;
let mut last_block = None;
let self_arc = Arc::new(self);
Expand All @@ -138,44 +129,37 @@ impl RunLoop {
prev_block: &mut Option<H256>,
) -> Option<domain::Auction> {
// wait for appropriate time to start building the auction
let start_block = match self.config.synchronization {
RunLoopMode::Unsynchronized => {
// Sleep a bit to avoid busy loops.
tokio::time::sleep(std::time::Duration::from_millis(1_000)).await;
*self.eth.current_block().borrow()
}
RunLoopMode::SyncToBlockchain => {
let current_block = *self.eth.current_block().borrow();
let time_since_last_block = current_block.observed_at.elapsed();
let auction_block = if time_since_last_block > self.config.max_run_loop_delay {
if prev_block.is_some_and(|prev_block| prev_block != current_block.hash) {
// don't emit warning if we finished prev run loop within the same block
tracing::warn!(
missed_by = ?time_since_last_block - self.config.max_run_loop_delay,
"missed optimal auction start, wait for new block"
);
}
ethrpc::block_stream::next_block(self.eth.current_block()).await
} else {
current_block
};
let start_block = {
let current_block = *self.eth.current_block().borrow();
let time_since_last_block = current_block.observed_at.elapsed();
let auction_block = if time_since_last_block > self.config.max_run_loop_delay {
if prev_block.is_some_and(|prev_block| prev_block != current_block.hash) {
// don't emit warning if we finished prev run loop within the same block
tracing::warn!(
missed_by = ?time_since_last_block - self.config.max_run_loop_delay,
"missed optimal auction start, wait for new block"
);
}
ethrpc::block_stream::next_block(self.eth.current_block()).await
} else {
current_block
};

self.run_maintenance(&auction_block).await;
match self
.solvable_orders_cache
.update(auction_block.number)
.await
{
Ok(()) => {
self.solvable_orders_cache.track_auction_update("success");
}
Err(err) => {
self.solvable_orders_cache.track_auction_update("failure");
tracing::warn!(?err, "failed to update auction");
}
self.run_maintenance(&auction_block).await;
match self
.solvable_orders_cache
.update(auction_block.number)
.await
{
Ok(()) => {
self.solvable_orders_cache.track_auction_update("success");
}
Err(err) => {
self.solvable_orders_cache.track_auction_update("failure");
tracing::warn!(?err, "failed to update auction");
}
auction_block
}
auction_block
};

let auction = self.cut_auction().await?;
Expand Down
11 changes: 3 additions & 8 deletions crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use {
crate::{
arguments::RunLoopMode,
domain::{self, competition::TradedOrder},
infra::{
self,
Expand Down Expand Up @@ -41,20 +40,17 @@ pub struct RunLoop {
block: u64,
solve_deadline: Duration,
liveness: Arc<Liveness>,
synchronization: RunLoopMode,
current_block: CurrentBlockWatcher,
max_winners_per_auction: usize,
}

impl RunLoop {
#[allow(clippy::too_many_arguments)]
pub fn new(
orderbook: infra::shadow::Orderbook,
drivers: Vec<Arc<infra::Driver>>,
trusted_tokens: AutoUpdatingTokenList,
solve_deadline: Duration,
liveness: Arc<Liveness>,
synchronization: RunLoopMode,
current_block: CurrentBlockWatcher,
max_winners_per_auction: usize,
) -> Self {
Expand All @@ -70,7 +66,6 @@ impl RunLoop {
block: 0,
solve_deadline,
liveness,
synchronization,
current_block,
max_winners_per_auction,
}
Expand All @@ -79,9 +74,9 @@ impl RunLoop {
pub async fn run_forever(mut self) -> ! {
let mut previous = None;
loop {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let _ = ethrpc::block_stream::next_block(&self.current_block).await;
};
// We use this as a synchronization mechanism to sync the run loop starts with
// the next mined block
let _ = ethrpc::block_stream::next_block(&self.current_block).await;
let Some(auction) = self.next_auction().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
Expand Down
2 changes: 0 additions & 2 deletions crates/e2e/src/setup/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ impl<'a> Services<'a> {

let args = [
"autopilot".to_string(),
"--run-loop-mode=sync-to-blockchain".to_string(),
"--max-run-loop-delay=100ms".to_string(),
"--auction-update-interval=1s".to_string(),
"--run-loop-native-price-timeout=500ms".to_string(),
format!("--ethflow-contract={:?}", self.contracts.ethflow.address()),
"--skip-event-sync=true".to_string(),
Expand Down

0 comments on commit 74eed7b

Please sign in to comment.