Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unsynchronised run loop #3050

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,6 @@ pub struct Arguments {
#[clap(long, env, use_value_delimiter = true)]
pub cow_amm_configs: Vec<CowAmmConfig>,

/// Controls start of the run loop.
#[clap(long, env, default_value = "unsynchronized")]
pub run_loop_mode: RunLoopMode,

/// If a new run loop would start more than this amount of time after the
/// system noticed the latest block, wait for the next block to appear
/// before continuing the run loop.
Expand Down Expand Up @@ -283,7 +279,6 @@ impl std::fmt::Display for Arguments {
max_settlement_transaction_wait,
s3,
cow_amm_configs,
run_loop_mode,
max_run_loop_delay,
run_loop_native_price_timeout,
max_winners_per_auction,
Expand Down Expand Up @@ -360,7 +355,6 @@ impl std::fmt::Display for Arguments {
)?;
writeln!(f, "s3: {:?}", s3)?;
writeln!(f, "cow_amm_configs: {:?}", cow_amm_configs)?;
writeln!(f, "run_loop_mode: {:?}", run_loop_mode)?;
writeln!(f, "max_run_loop_delay: {:?}", max_run_loop_delay)?;
writeln!(
f,
Expand Down Expand Up @@ -529,16 +523,6 @@ impl FromStr for CowAmmConfig {
}
}

/// Controls the timing of the run loop.
#[derive(clap::Parser, clap::ValueEnum, Clone, Debug, Default, Copy)]
pub enum RunLoopMode {
/// The run loop starts with the next mined block.
SyncToBlockchain,
/// The run loop starts whenever the previous loop ends.
#[default]
Unsynchronized,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
65 changes: 3 additions & 62 deletions crates/autopilot/src/maintenance.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{
arguments::RunLoopMode,
boundary::events::settlement::{GPv2SettlementContract, Indexer},
database::{
onchain_order_events::{
Expand All @@ -11,26 +10,22 @@ use {
Postgres,
},
event_updater::EventUpdater,
solvable_orders::SolvableOrdersCache,
},
anyhow::Result,
ethrpc::block_stream::{into_stream, BlockInfo, CurrentBlockWatcher},
futures::StreamExt,
ethrpc::block_stream::BlockInfo,
prometheus::{
core::{AtomicU64, GenericGauge},
HistogramVec,
IntCounterVec,
},
shared::maintenance::Maintaining,
std::{future::Future, sync::Arc, time::Duration},
tokio::{sync::Mutex, time::timeout},
std::{future::Future, sync::Arc},
tokio::sync::Mutex,
};

/// Coordinates all the updates that need to run a new block
/// to ensure a consistent view of the system.
pub struct Maintenance {
/// Set of orders that make up the current auction.
orders_cache: Arc<SolvableOrdersCache>,
/// Indexes and persists all events emited by the settlement contract.
settlement_indexer: EventUpdater<Indexer, GPv2SettlementContract>,
/// Indexes ethflow orders (orders selling native ETH).
Expand All @@ -46,12 +41,10 @@ pub struct Maintenance {

impl Maintenance {
pub fn new(
orders_cache: Arc<SolvableOrdersCache>,
settlement_indexer: EventUpdater<Indexer, GPv2SettlementContract>,
db_cleanup: Postgres,
) -> Self {
Self {
orders_cache,
settlement_indexer,
db_cleanup,
cow_amm_indexer: Default::default(),
Expand Down Expand Up @@ -126,58 +119,6 @@ impl Maintenance {
Ok(())
}

/// Spawns a background task that runs on every new block but also
/// at least after every `update_interval`.
pub fn spawn_background_task(
self_: Arc<Self>,
run_loop_mode: RunLoopMode,
current_block: CurrentBlockWatcher,
update_interval: Duration,
) {
tokio::task::spawn(async move {
match run_loop_mode {
RunLoopMode::SyncToBlockchain => {
// Update last seen block metric only since everything else will be updated
// inside the runloop.
let mut stream = into_stream(current_block);
loop {
let next_update = timeout(update_interval, stream.next());
match next_update.await {
Ok(Some(block)) => {
metrics().last_seen_block.set(block.number);
}
Ok(None) => break,
Err(_timeout) => {}
};
}
}
RunLoopMode::Unsynchronized => {
let mut latest_block = *current_block.borrow();
let mut stream = into_stream(current_block);
loop {
let next_update = timeout(update_interval, stream.next());
let current_block = match next_update.await {
Ok(Some(block)) => {
metrics().last_seen_block.set(block.number);
block
}
Ok(None) => break,
Err(_timeout) => latest_block,
};
if let Err(err) = self_.update_inner().await {
tracing::warn!(?err, "failed to run background task successfully");
}
if let Err(err) = self_.orders_cache.update(current_block.number).await {
tracing::warn!(?err, "failed to update auction successfully");
}
latest_block = current_block;
}
panic!("block stream terminated unexpectedly");
}
}
});
}

/// Runs the future and collects runtime metrics.
async fn timed_future<T>(label: &str, fut: impl Future<Output = T>) -> T {
let _timer = metrics()
Expand Down
10 changes: 2 additions & 8 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,7 @@ pub async fn run(args: Arguments) {
let trusted_tokens =
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;

let mut maintenance = Maintenance::new(
solvable_orders_cache.clone(),
settlement_event_indexer,
db.clone(),
);
let mut maintenance = Maintenance::new(settlement_event_indexer, db.clone());
maintenance.with_cow_amms(&cow_amm_registry);

if let Some(ethflow_contract) = args.ethflow_contract {
Expand Down Expand Up @@ -530,7 +526,6 @@ pub async fn run(args: Arguments) {
submission_deadline: args.submission_deadline as u64,
max_settlement_transaction_wait: args.max_settlement_transaction_wait,
solve_deadline: args.solve_deadline,
synchronization: args.run_loop_mode,
max_run_loop_delay: args.max_run_loop_delay,
max_winners_per_auction: args.max_winners_per_auction,
};
Expand All @@ -554,7 +549,7 @@ pub async fn run(args: Arguments) {
liveness.clone(),
Arc::new(maintenance),
);
run.run_forever(args.auction_update_interval).await;
run.run_forever().await;
unreachable!("run loop exited");
}

Expand Down Expand Up @@ -625,7 +620,6 @@ async fn shadow_mode(args: Arguments) -> ! {
trusted_tokens,
args.solve_deadline,
liveness.clone(),
args.run_loop_mode,
current_block,
args.max_winners_per_auction,
);
Expand Down
74 changes: 29 additions & 45 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{
arguments::RunLoopMode,
database::competition::Competition,
domain::{
self,
Expand Down Expand Up @@ -47,7 +46,6 @@ pub struct Config {
pub submission_deadline: u64,
pub max_settlement_transaction_wait: Duration,
pub solve_deadline: Duration,
pub synchronization: RunLoopMode,
/// How much time past observing the current block the runloop is
/// allowed to start before it has to re-synchronize to the blockchain
/// by waiting for the next block to appear.
Expand Down Expand Up @@ -105,14 +103,7 @@ impl RunLoop {
}
}

pub async fn run_forever(self, update_interval: Duration) -> ! {
Maintenance::spawn_background_task(
self.maintenance.clone(),
self.config.synchronization,
self.eth.current_block().clone(),
update_interval,
);

pub async fn run_forever(self) -> ! {
let mut last_auction = None;
let mut last_block = None;
let self_arc = Arc::new(self);
Expand All @@ -138,44 +129,37 @@ impl RunLoop {
prev_block: &mut Option<H256>,
) -> Option<domain::Auction> {
// wait for appropriate time to start building the auction
let start_block = match self.config.synchronization {
RunLoopMode::Unsynchronized => {
// Sleep a bit to avoid busy loops.
tokio::time::sleep(std::time::Duration::from_millis(1_000)).await;
*self.eth.current_block().borrow()
}
RunLoopMode::SyncToBlockchain => {
let current_block = *self.eth.current_block().borrow();
let time_since_last_block = current_block.observed_at.elapsed();
let auction_block = if time_since_last_block > self.config.max_run_loop_delay {
if prev_block.is_some_and(|prev_block| prev_block != current_block.hash) {
// don't emit warning if we finished prev run loop within the same block
tracing::warn!(
missed_by = ?time_since_last_block - self.config.max_run_loop_delay,
"missed optimal auction start, wait for new block"
);
}
ethrpc::block_stream::next_block(self.eth.current_block()).await
} else {
current_block
};
let start_block = {
let current_block = *self.eth.current_block().borrow();
let time_since_last_block = current_block.observed_at.elapsed();
let auction_block = if time_since_last_block > self.config.max_run_loop_delay {
if prev_block.is_some_and(|prev_block| prev_block != current_block.hash) {
// don't emit warning if we finished prev run loop within the same block
tracing::warn!(
missed_by = ?time_since_last_block - self.config.max_run_loop_delay,
"missed optimal auction start, wait for new block"
);
}
ethrpc::block_stream::next_block(self.eth.current_block()).await
} else {
current_block
};

self.run_maintenance(&auction_block).await;
match self
.solvable_orders_cache
.update(auction_block.number)
.await
{
Ok(()) => {
self.solvable_orders_cache.track_auction_update("success");
}
Err(err) => {
self.solvable_orders_cache.track_auction_update("failure");
tracing::warn!(?err, "failed to update auction");
}
self.run_maintenance(&auction_block).await;
match self
.solvable_orders_cache
.update(auction_block.number)
.await
{
Ok(()) => {
self.solvable_orders_cache.track_auction_update("success");
}
Err(err) => {
self.solvable_orders_cache.track_auction_update("failure");
tracing::warn!(?err, "failed to update auction");
}
auction_block
}
auction_block
};

let auction = self.cut_auction().await?;
Expand Down
11 changes: 3 additions & 8 deletions crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

use {
crate::{
arguments::RunLoopMode,
domain::{self, competition::TradedOrder},
infra::{
self,
Expand Down Expand Up @@ -41,20 +40,17 @@ pub struct RunLoop {
block: u64,
solve_deadline: Duration,
liveness: Arc<Liveness>,
synchronization: RunLoopMode,
current_block: CurrentBlockWatcher,
max_winners_per_auction: usize,
}

impl RunLoop {
#[allow(clippy::too_many_arguments)]
pub fn new(
orderbook: infra::shadow::Orderbook,
drivers: Vec<Arc<infra::Driver>>,
trusted_tokens: AutoUpdatingTokenList,
solve_deadline: Duration,
liveness: Arc<Liveness>,
synchronization: RunLoopMode,
current_block: CurrentBlockWatcher,
max_winners_per_auction: usize,
) -> Self {
Expand All @@ -70,7 +66,6 @@ impl RunLoop {
block: 0,
solve_deadline,
liveness,
synchronization,
current_block,
max_winners_per_auction,
}
Expand All @@ -79,9 +74,9 @@ impl RunLoop {
pub async fn run_forever(mut self) -> ! {
let mut previous = None;
loop {
if let RunLoopMode::SyncToBlockchain = self.synchronization {
let _ = ethrpc::block_stream::next_block(&self.current_block).await;
};
// We use this as a synchronization mechanism to sync the run loop starts with
// the next mined block
let _ = ethrpc::block_stream::next_block(&self.current_block).await;
m-lord-renkse marked this conversation as resolved.
Show resolved Hide resolved
let Some(auction) = self.next_auction().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
Expand Down
1 change: 0 additions & 1 deletion crates/e2e/src/setup/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ impl<'a> Services<'a> {

let args = [
"autopilot".to_string(),
"--run-loop-mode=sync-to-blockchain".to_string(),
"--max-run-loop-delay=100ms".to_string(),
"--auction-update-interval=1s".to_string(),
"--run-loop-native-price-timeout=500ms".to_string(),
Expand Down
Loading