diff --git a/offchain/dispatcher/src/drivers/context.rs b/offchain/dispatcher/src/drivers/context.rs index 6f2809452..9d05600dd 100644 --- a/offchain/dispatcher/src/drivers/context.rs +++ b/offchain/dispatcher/src/drivers/context.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use crate::{ - machine::{rollups_broker::BrokerFacadeError, BrokerSend, RollupStatus}, + machine::{rollups_broker::BrokerFacadeError, BrokerSend}, metrics::DispatcherMetrics, }; @@ -11,49 +11,53 @@ use types::foldables::Input; #[derive(Debug)] pub struct Context { - inputs_sent_count: u64, - last_event_is_finish_epoch: bool, - last_timestamp: u64, + number_of_inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, // constants - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, + // metrics dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, } impl Context { pub fn new( - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, - status: RollupStatus, + number_of_inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, ) -> Self { + assert!(epoch_length > 0); Self { - inputs_sent_count: status.inputs_sent_count, - last_event_is_finish_epoch: status.last_event_is_finish_epoch, - last_timestamp: genesis_timestamp, - genesis_timestamp, + number_of_inputs_sent, + last_input_epoch, + last_finished_epoch, + genesis_block, epoch_length, dapp_metadata, metrics, } } - pub fn inputs_sent_count(&self) -> u64 { - self.inputs_sent_count + pub fn number_of_inputs_sent(&self) -> u64 { + self.number_of_inputs_sent } pub async fn finish_epoch_if_needed( &mut self, - event_timestamp: u64, + block: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - println!("input_tm: {}", event_timestamp); - if self.should_finish_epoch(event_timestamp) { - self.finish_epoch(event_timestamp, broker).await?; + let epoch = self.calculate_epoch(block); + if self.should_finish_epoch(epoch) { + self.finish_epoch(broker).await?; } Ok(()) } @@ -63,98 +67,119 @@ impl Context { input: &Input, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - broker.enqueue_input(self.inputs_sent_count, input).await?; + let input_block = input.block_added.number.as_u64(); + self.finish_epoch_if_needed(input_block, broker).await?; + + broker + .enqueue_input(self.number_of_inputs_sent, input) + .await?; + self.metrics .advance_inputs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.inputs_sent_count += 1; - self.last_event_is_finish_epoch = false; + + self.number_of_inputs_sent += 1; + self.last_input_epoch = + Some(self.calculate_epoch(input.block_added.number.as_u64())); + Ok(()) } } impl Context { - fn calculate_epoch(&self, timestamp: u64) -> u64 { - assert!(timestamp >= self.genesis_timestamp); - (timestamp - self.genesis_timestamp) / self.epoch_length - } - - // This logic works because we call this function with `event_timestamp` being equal to the - // timestamp of each individual input, rather than just the latest from the blockchain. - fn should_finish_epoch(&self, event_timestamp: u64) -> bool { - if self.inputs_sent_count == 0 || self.last_event_is_finish_epoch { - false - } else { - let current_epoch = self.calculate_epoch(self.last_timestamp); - let event_epoch = self.calculate_epoch(event_timestamp); - event_epoch > current_epoch + fn calculate_epoch(&self, block: u64) -> u64 { + assert!(block >= self.genesis_block); + (block - self.genesis_block) / self.epoch_length + } + + fn should_finish_epoch(&self, epoch: u64) -> bool { + if self.last_finished_epoch == self.last_input_epoch { + return false; // if the current epoch is empty } + + if epoch == self.last_input_epoch.unwrap() { + return false; // if the current epoch is still not over + } + + epoch > self.last_finished_epoch.unwrap_or(0) } async fn finish_epoch( &mut self, - event_timestamp: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - assert!(event_timestamp >= self.genesis_timestamp); - broker.finish_epoch(self.inputs_sent_count).await?; + broker.finish_epoch(self.number_of_inputs_sent).await?; self.metrics .finish_epochs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.last_timestamp = event_timestamp; - self.last_event_is_finish_epoch = true; + + self.last_finished_epoch = self.last_input_epoch; Ok(()) } } #[cfg(test)] -mod private_tests { - use crate::{drivers::mock, metrics::DispatcherMetrics}; +mod tests { + use std::collections::VecDeque; - use super::{Context, DAppMetadata}; + use crate::drivers::mock::Sent; + use rollups_events::DAppMetadata; + use serial_test::serial; - // -------------------------------------------------------------------------------------------- - // calculate_epoch_for - // -------------------------------------------------------------------------------------------- + use crate::{drivers::mock, metrics::DispatcherMetrics}; - fn new_context_for_calculate_epoch_test( - genesis_timestamp: u64, - epoch_length: u64, - ) -> Context { - Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 0, - genesis_timestamp, - epoch_length, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), + use super::Context; + + impl Default for Context { + fn default() -> Self { + Context::new( + /* genesis_block */ 0, + /* epoch_length */ 10, + /* dapp_metadata */ DAppMetadata::default(), + /* metrics */ DispatcherMetrics::default(), + /* number_of_inputs_sent */ 0, + /* last_input_epoch */ None, + /* last_finished_epoch */ None, + ) } } + // -------------------------------------------------------------------------------------------- + // calculate_epoch + // -------------------------------------------------------------------------------------------- + #[test] fn calculate_epoch_with_zero_genesis() { - let epoch_length = 3; - let context = new_context_for_calculate_epoch_test(0, epoch_length); - let n = 10; + let mut context = Context::default(); + context.genesis_block = 0; + context.epoch_length = 10; + + let number_of_epochs = 10; let mut tested = 0; - for epoch in 0..n { - let x = epoch * epoch_length; - let y = (epoch + 1) * epoch_length; - for i in x..y { - assert_eq!(context.calculate_epoch(i), epoch); + for current_epoch in 0..number_of_epochs { + let block_lower_bound = current_epoch * context.epoch_length; + let block_upper_bound = (current_epoch + 1) * context.epoch_length; + for i in block_lower_bound..block_upper_bound { + assert_eq!(context.calculate_epoch(i), current_epoch); tested += 1; } } - assert_eq!(tested, n * epoch_length); - assert_eq!(context.calculate_epoch(9), 3); + + assert_eq!(tested, number_of_epochs * context.epoch_length); + assert_eq!( + context.calculate_epoch(context.epoch_length * number_of_epochs), + context.epoch_length + ); } #[test] fn calculate_epoch_with_offset_genesis() { - let context = new_context_for_calculate_epoch_test(2, 2); + let mut context = Context::default(); + context.genesis_block = 2; + context.epoch_length = 2; + assert_eq!(context.calculate_epoch(2), 0); assert_eq!(context.calculate_epoch(3), 0); assert_eq!(context.calculate_epoch(4), 1); @@ -164,68 +189,120 @@ mod private_tests { #[test] #[should_panic] - fn calculate_epoch_invalid() { - new_context_for_calculate_epoch_test(4, 3).calculate_epoch(2); + fn calculate_epoch_should_panic() { + let mut context = Context::default(); + context.genesis_block = 4; + context.epoch_length = 4; + + context.calculate_epoch(2); + } + + // -------------------------------------------------------------------------------------------- + // should_finish_epoch -- first epoch + // -------------------------------------------------------------------------------------------- + + #[test] + fn should_finish_the_first_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_finish_the_first_epoch_by_a_lot() { + let mut context = Context::default(); + context.number_of_inputs_sent = 110; + context.last_input_epoch = Some(9); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(100); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_not_finish_an_empty_first_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_very_late_empty_first_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(2340); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_first_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(9); + assert_eq!(context.should_finish_epoch(epoch), false); } // -------------------------------------------------------------------------------------------- - // should_finish_epoch + // should_finish_epoch -- other epochs // -------------------------------------------------------------------------------------------- #[test] - fn should_not_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 42; + context.last_input_epoch = Some(4); + context.last_finished_epoch = Some(3); + let epoch = context.calculate_epoch(54); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_not_finish_epoch_because_of_zero_inputs() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_epoch_by_a_lot() { + let mut context = Context::default(); + context.number_of_inputs_sent = 142; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(2); + let epoch = context.calculate_epoch(190); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(context.should_finish_epoch(5)); + fn should_not_finish_an_empty_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 120; + context.last_input_epoch = Some(9); + context.last_finished_epoch = Some(9); + let epoch = context.calculate_epoch(105); + assert_eq!(context.should_finish_epoch(epoch), false); } #[test] - fn should_finish_epoch_because_last_event_is_finish_epoch() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: true, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(5)); + fn should_not_finish_a_very_late_empty_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 120; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(15); + let epoch = context.calculate_epoch(1000); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_epoch() { + let mut context = Context::default(); + context.number_of_inputs_sent = 230; + context.last_input_epoch = Some(11); + context.last_finished_epoch = Some(10); + let epoch = context.calculate_epoch(110); + assert_eq!(context.should_finish_epoch(epoch), false); } // -------------------------------------------------------------------------------------------- @@ -234,72 +311,29 @@ mod private_tests { #[tokio::test] async fn finish_epoch_ok() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let broker = mock::Broker::new(vec![], vec![]); - let timestamp = 6; - let result = context.finish_epoch(timestamp, &broker).await; - assert!(result.is_ok()); - assert_eq!(context.last_timestamp, timestamp); - assert!(context.last_event_is_finish_epoch); - } + let mut context = Context::default(); + context.number_of_inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; - #[tokio::test] - #[should_panic] - async fn finish_epoch_invalid() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 6, - genesis_timestamp: 5, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; let broker = mock::Broker::new(vec![], vec![]); - let _ = context.finish_epoch(0, &broker).await; + let result = context.finish_epoch(&broker).await; + assert!(result.is_ok()); + assert_eq!(context.number_of_inputs_sent, 1); + assert_eq!(context.last_input_epoch, Some(0)); + assert_eq!(context.last_finished_epoch, Some(0)); } #[tokio::test] async fn finish_epoch_broker_error() { - let last_timestamp = 3; - let last_event_is_finish_epoch = false; - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch, - last_timestamp, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch(6, &broker).await; + let result = context.finish_epoch(&broker).await; assert!(result.is_err()); - assert_eq!(context.last_timestamp, last_timestamp); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch - ); + assert_eq!(context.number_of_inputs_sent, 0); + assert_eq!(context.last_input_epoch, None); + assert_eq!(context.last_finished_epoch, None); } -} - -#[cfg(test)] -mod public_tests { - use crate::{ - drivers::mock::{self, SendInteraction}, - machine::RollupStatus, - metrics::DispatcherMetrics, - }; - - use super::{Context, DAppMetadata}; // -------------------------------------------------------------------------------------------- // new @@ -307,26 +341,41 @@ mod public_tests { #[tokio::test] async fn new_ok() { - let genesis_timestamp = 42; + let genesis_block = 42; let epoch_length = 24; - let inputs_sent_count = 150; - let last_event_is_finish_epoch = true; - let rollup_status = RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch, - }; + let number_of_inputs_sent = 150; + let last_input_epoch = Some(14); + let last_finished_epoch = Some(37); + let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, + number_of_inputs_sent, + last_input_epoch, + last_finished_epoch, ); - assert_eq!(context.genesis_timestamp, genesis_timestamp); - assert_eq!(context.inputs_sent_count, inputs_sent_count); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch + + assert_eq!(context.genesis_block, genesis_block); + assert_eq!(context.epoch_length, epoch_length); + assert_eq!(context.dapp_metadata, DAppMetadata::default()); + assert_eq!(context.number_of_inputs_sent, number_of_inputs_sent); + assert_eq!(context.last_input_epoch, last_input_epoch); + assert_eq!(context.last_finished_epoch, last_finished_epoch); + } + + #[test] + #[should_panic] + fn new_should_panic_because_epoch_length_is_zero() { + Context::new( + 0, + 0, + DAppMetadata::default(), + DispatcherMetrics::default(), + 0, + None, + None, ); } @@ -336,17 +385,10 @@ mod public_tests { #[test] fn inputs_sent_count() { - let inputs_sent_count = 42; - let context = Context { - inputs_sent_count, - last_event_is_finish_epoch: false, // ignored - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert_eq!(context.inputs_sent_count(), inputs_sent_count); + let number_of_inputs_sent = 42; + let mut context = Context::default(); + context.number_of_inputs_sent = number_of_inputs_sent; + assert_eq!(context.number_of_inputs_sent(), number_of_inputs_sent); } // -------------------------------------------------------------------------------------------- @@ -355,52 +397,40 @@ mod public_tests { #[tokio::test] async fn finish_epoch_if_needed_true() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.number_of_inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(12, &broker).await; assert!(result.is_ok()); - broker - .assert_send_interactions(vec![SendInteraction::FinishedEpoch(1)]); + broker.assert_send_interactions(vec![ + Sent::Finish, // + ]); } #[tokio::test] async fn finish_epoch_if_needed_false() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 2, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.number_of_inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(3, &broker).await; + let result = context.finish_epoch_if_needed(9, &broker).await; assert!(result.is_ok()); broker.assert_send_interactions(vec![]); } #[tokio::test] async fn finish_epoch_if_needed_broker_error() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.number_of_inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(28, &broker).await; assert!(result.is_err()); } @@ -410,40 +440,215 @@ mod public_tests { #[tokio::test] async fn enqueue_input_ok() { - let inputs_sent_count = 42; - let mut context = Context { - inputs_sent_count, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let input = mock::new_input(2); + let number_of_inputs_sent = 42; + let last_input_epoch = Some(1); + let last_finished_epoch = None; + + let mut context = Context::default(); + context.number_of_inputs_sent = number_of_inputs_sent; + context.last_input_epoch = last_input_epoch; + context.last_finished_epoch = last_finished_epoch; + + let input = mock::new_input(22); let broker = mock::Broker::new(vec![], vec![]); let result = context.enqueue_input(&input, &broker).await; assert!(result.is_ok()); - assert_eq!(context.inputs_sent_count, inputs_sent_count + 1); - assert!(!context.last_event_is_finish_epoch); - broker.assert_send_interactions(vec![SendInteraction::EnqueuedInput( - inputs_sent_count, - )]); + + assert_eq!(context.number_of_inputs_sent, number_of_inputs_sent + 1); + assert_eq!(context.last_input_epoch, Some(2)); + assert_eq!(context.last_finished_epoch, Some(1)); + + broker.assert_send_interactions(vec![ + Sent::Finish, + Sent::Input(number_of_inputs_sent), + ]); } #[tokio::test] async fn enqueue_input_broker_error() { - let mut context = Context { - inputs_sent_count: 42, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_enqueue_input_error(); - let result = context.enqueue_input(&mock::new_input(2), &broker).await; + let result = context.enqueue_input(&mock::new_input(82), &broker).await; assert!(result.is_err()); } + + // -------------------------------------------------------------------------------------------- + // deterministic behavior + // -------------------------------------------------------------------------------------------- + + #[derive(Clone)] + struct Case { + input_blocks: Vec, + epoch_length: u64, + last_block: u64, + expected: Vec, + } + + #[tokio::test] + #[serial] + async fn deterministic_behavior() { + let cases: Vec = vec![ + Case { + input_blocks: vec![], + epoch_length: 2, + last_block: 100, + expected: vec![], + }, + Case { + input_blocks: vec![0, 1, 4, 5], + epoch_length: 2, + last_block: 10, + expected: vec![ + Sent::Input(0), + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Input(3), + Sent::Finish, + ], + }, + Case { + input_blocks: vec![0, 2], + epoch_length: 2, + last_block: 4, + expected: vec![ + Sent::Input(0), + Sent::Finish, + Sent::Input(1), + Sent::Finish, + ], + }, + Case { + input_blocks: vec![1, 2, 4], + epoch_length: 2, + last_block: 6, + expected: vec![ + Sent::Input(0), + Sent::Finish, + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Finish, + ], + }, + Case { + input_blocks: vec![0, 1, 2, 3, 4, 5, 6, 7], + epoch_length: 2, + last_block: 7, + expected: vec![ + Sent::Input(0), + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Input(3), + Sent::Finish, + Sent::Input(4), + Sent::Input(5), + Sent::Finish, + Sent::Input(6), + Sent::Input(7), + ], + }, + Case { + input_blocks: vec![0, 5, 9], + epoch_length: 2, + last_block: 10, + expected: vec![ + Sent::Input(0), + Sent::Finish, + Sent::Input(1), + Sent::Finish, + Sent::Input(2), + Sent::Finish, + ], + }, + ]; + for (i, case) in cases.iter().enumerate() { + println!("Testing case {}.", i); + test_deterministic_case(case.clone()).await; + } + } + + // -------------------------------------------------------------------------------------------- + // auxiliary + // -------------------------------------------------------------------------------------------- + + async fn test_deterministic_case(case: Case) { + let broker1 = one_at_a_time( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + let broker2 = all_at_once( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + broker1.assert_send_interactions(case.expected.clone()); + broker2.assert_send_interactions(case.expected.clone()); + } + + async fn one_at_a_time( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("================================================"); + println!("one_block_at_a_time:"); + + let mut input_blocks: VecDeque<_> = input_blocks.into(); + let mut current_input_block = input_blocks.pop_front(); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in 0..=last_block { + if let Some(input_block) = current_input_block { + if block == input_block { + println!("\tenqueue_input(input_block: {})", block); + let input = mock::new_input(block as u32); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + + current_input_block = input_blocks.pop_front(); + } + } + + println!("\tfinish_epoch_if_needed(block: {})\n", block); + let result = context.finish_epoch_if_needed(block, &broker).await; + assert!(result.is_ok()); + } + + broker + } + + async fn all_at_once( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("all_inputs_at_once:"); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in input_blocks { + println!("\tenqueue_input(input_block: {})\n", block); + let input = mock::new_input(block as u32); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + } + + println!("\tfinish_epoch_if_needed(last_block: {})", last_block); + let result = context.finish_epoch_if_needed(last_block, &broker).await; + assert!(result.is_ok()); + + println!("================================================"); + + broker + } } diff --git a/offchain/dispatcher/src/drivers/machine.rs b/offchain/dispatcher/src/drivers/machine.rs index b6d968521..cfd32dcd6 100644 --- a/offchain/dispatcher/src/drivers/machine.rs +++ b/offchain/dispatcher/src/drivers/machine.rs @@ -6,9 +6,9 @@ use super::Context; use crate::machine::{rollups_broker::BrokerFacadeError, BrokerSend}; use eth_state_fold_types::{ethereum_types::Address, Block}; -use types::foldables::{DAppInputBox, Input, InputBox}; +use types::foldables::{DAppInputBox, InputBox}; -use tracing::{debug, instrument, trace}; +use tracing::{debug, instrument}; pub struct MachineDriver { dapp_address: Address, @@ -30,17 +30,14 @@ impl MachineDriver { match input_box.dapp_input_boxes.get(&self.dapp_address) { None => { debug!("No inputs for dapp {}", self.dapp_address); - return Ok(()); } - Some(dapp_input_box) => { self.process_inputs(context, dapp_input_box, broker).await? } }; - println!("block.timestamp: {:?}", block.timestamp.as_u64()); context - .finish_epoch_if_needed(block.timestamp.as_u64(), broker) + .finish_epoch_if_needed(block.number.as_u64(), broker) .await?; Ok(()) @@ -57,476 +54,457 @@ impl MachineDriver { ) -> Result<(), BrokerFacadeError> { tracing::trace!( "Last input sent to machine manager `{}`, current input `{}`", - context.inputs_sent_count(), + context.number_of_inputs_sent(), dapp_input_box.inputs.len() ); let input_slice = dapp_input_box .inputs - .skip(context.inputs_sent_count() as usize); + .skip(context.number_of_inputs_sent() as usize); for input in input_slice { - self.process_input(context, &input, broker).await?; + context.enqueue_input(&input, broker).await?; } Ok(()) } - - #[instrument(level = "trace", skip_all)] - async fn process_input( - &self, - context: &mut Context, - input: &Input, - broker: &impl BrokerSend, - ) -> Result<(), BrokerFacadeError> { - let input_timestamp = input.block_added.timestamp.as_u64(); - trace!(?context, ?input_timestamp); - - context - .finish_epoch_if_needed(input_timestamp, broker) - .await?; - - context.enqueue_input(input, broker).await?; - - Ok(()) - } } -#[cfg(test)] -mod tests { - use eth_state_fold_types::{ethereum_types::H160, Block}; - use rollups_events::DAppMetadata; - use serial_test::serial; - use std::sync::Arc; - - use crate::{ - drivers::{ - mock::{self, SendInteraction}, - Context, - }, - machine::RollupStatus, - metrics::DispatcherMetrics, - }; - - use super::MachineDriver; - - // -------------------------------------------------------------------------------------------- - // process_input - // -------------------------------------------------------------------------------------------- - - async fn test_process_input( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - let machine_driver = MachineDriver::new(H160::random()); - for block_timestamp in input_timestamps { - let input = mock::new_input(block_timestamp); - let result = machine_driver - .process_input(&mut context, &input, &broker) - .await; - assert!(result.is_ok()); - } - - broker.assert_send_interactions(expected); - } - - #[tokio::test] - async fn process_input_right_before_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_at_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_last_event_is_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: true, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_after_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![6, 7]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(3), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_crossing_two_epochs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![3, 4, 5, 6, 7, 9, 10, 11]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - SendInteraction::EnqueuedInput(5), - SendInteraction::FinishedEpoch(6), - SendInteraction::EnqueuedInput(6), - SendInteraction::EnqueuedInput(7), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - // -------------------------------------------------------------------------------------------- - // process_inputs - // -------------------------------------------------------------------------------------------- - - async fn test_process_inputs( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - let machine_driver = MachineDriver::new(H160::random()); - let dapp_input_box = types::foldables::DAppInputBox { - inputs: input_timestamps - .iter() - .map(|timestamp| Arc::new(mock::new_input(*timestamp))) - .collect::>() - .into(), - }; - let result = machine_driver - .process_inputs(&mut context, &dapp_input_box, &broker) - .await; - assert!(result.is_ok()); - - broker.assert_send_interactions(expected); - } - - #[tokio::test] - async fn test_process_inputs_without_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - ]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_inputs_with_some_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(3)]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_inputs_skipping_all() { - let rollup_status = RollupStatus { - inputs_sent_count: 4, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; - } - - // -------------------------------------------------------------------------------------------- - // react - // -------------------------------------------------------------------------------------------- - - async fn test_react( - block: Block, - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 1716392210, - 86400, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - - let dapp_address = H160::random(); - let machine_driver = MachineDriver::new(dapp_address); - - let input_box = mock::new_input_box(); - let input_box = - mock::update_input_box(input_box, dapp_address, input_timestamps); - - let result = machine_driver - .react(&mut context, &block, &input_box, &broker) - .await; - assert!(result.is_ok()); - - broker.assert_send_interactions(expected); - } - - #[tokio::test] - async fn react_without_finish_epoch() { - let block = mock::new_block(3); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn react_with_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn react_with_internal_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4, 5]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn react_without_inputs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - let block = mock::new_block(5); - let input_box = mock::new_input_box(); - let machine_driver = MachineDriver::new(H160::random()); - let result = machine_driver - .react(&mut context, &block, &input_box, &broker) - .await; - assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); - } - - #[tokio::test] - async fn react_with_inputs_after_first_epoch_length() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![7, 8]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - #[serial] - async fn react_bug_buster_original() { - let block = mock::new_block(1716774006); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![ - 1716495424, // - 1716514994, // - 1716550722, // - 1716551814, // - 1716552408, // - 1716558302, // - 1716558322, // - 1716564194, // - 1716564306, // - 1716564696, // - 1716568314, // - 1716568652, // - 1716569100, // - 1716569136, // - 1716578858, // - 1716578948, // - ]; - let expected = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - SendInteraction::EnqueuedInput(5), - SendInteraction::EnqueuedInput(6), - SendInteraction::EnqueuedInput(7), - SendInteraction::EnqueuedInput(8), - SendInteraction::EnqueuedInput(9), - SendInteraction::FinishedEpoch(10), - SendInteraction::EnqueuedInput(10), - SendInteraction::EnqueuedInput(11), - SendInteraction::EnqueuedInput(12), - SendInteraction::EnqueuedInput(13), - SendInteraction::EnqueuedInput(14), - SendInteraction::EnqueuedInput(15), - SendInteraction::FinishedEpoch(16), - ]; - test_react(block, rollup_status, input_timestamps, expected).await; - - let block2 = mock::new_block(1716858268); - let input_timestamps2 = vec![1716858268]; - let expected2 = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - ]; - test_react(block2, rollup_status, input_timestamps2, expected2).await - } - - #[tokio::test] - #[serial] - async fn react_bug_buster_reconstruction() { - let block = mock::new_block(1716859860); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![ - 1716495424, // - 1716514994, // - 1716550722, // - 1716551814, // - 1716552408, // - 1716558302, // - 1716558322, // - 1716564194, // - 1716564306, // - 1716564696, // - 1716568314, // - 1716568652, // - 1716569100, // - 1716569136, // - 1716578858, // - 1716578948, // - 1716858268, // extra - 1716858428, // extra - 1716859860, // extra - ]; - let expected = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - SendInteraction::EnqueuedInput(5), - SendInteraction::EnqueuedInput(6), - SendInteraction::EnqueuedInput(7), - SendInteraction::EnqueuedInput(8), - SendInteraction::EnqueuedInput(9), - SendInteraction::FinishedEpoch(10), - SendInteraction::EnqueuedInput(10), - SendInteraction::EnqueuedInput(11), - SendInteraction::EnqueuedInput(12), - SendInteraction::EnqueuedInput(13), - SendInteraction::EnqueuedInput(14), - SendInteraction::EnqueuedInput(15), - SendInteraction::FinishedEpoch(16), - SendInteraction::EnqueuedInput(16), - SendInteraction::EnqueuedInput(17), - SendInteraction::EnqueuedInput(18), - ]; - test_react(block, rollup_status, input_timestamps, expected).await; - } -} +// #[cfg(test)] +// mod tests { +// use eth_state_fold_types::{ethereum_types::H160, Block}; +// use rollups_events::DAppMetadata; +// use serial_test::serial; +// use std::sync::Arc; +// +// use crate::{ +// drivers::{ +// mock::{self, SendInteraction}, +// Context, +// }, +// machine::RollupStatus, +// metrics::DispatcherMetrics, +// }; +// +// use super::MachineDriver; +// +// // -------------------------------------------------------------------------------------------- +// // process_input +// // -------------------------------------------------------------------------------------------- +// +// async fn test_process_input( +// rollup_status: RollupStatus, +// input_blocks: Vec, +// expected: Vec, +// ) { +// let broker = mock::Broker::new(vec![rollup_status], Vec::new()); +// let mut context = Context::new( +// 0, +// 5, +// DAppMetadata::default(), +// DispatcherMetrics::default(), +// rollup_status, +// ); +// let machine_driver = MachineDriver::new(H160::random()); +// for block_timestamp in input_blocks { +// let input = mock::new_input(block_timestamp); +// let result = machine_driver +// .process_input(&mut context, &input, &broker) +// .await; +// assert!(result.is_ok()); +// } +// +// broker.assert_send_interactions(expected); +// } +// +// #[tokio::test] +// async fn process_input_right_before_finish_epoch() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![4]; +// let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; +// test_process_input(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn process_input_at_finish_epoch() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 1, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![5]; +// let send_interactions = vec![ +// SendInteraction::FinishedEpoch(1), +// SendInteraction::EnqueuedInput(1), +// ]; +// test_process_input(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn process_input_last_event_is_finish_epoch() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: true, +// }; +// let input_timestamps = vec![5]; +// let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; +// test_process_input(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn process_input_after_finish_epoch() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 3, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![6, 7]; +// let send_interactions = vec![ +// SendInteraction::FinishedEpoch(3), +// SendInteraction::EnqueuedInput(3), +// SendInteraction::EnqueuedInput(4), +// ]; +// test_process_input(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn process_input_crossing_two_epochs() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![3, 4, 5, 6, 7, 9, 10, 11]; +// let send_interactions = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::EnqueuedInput(1), +// SendInteraction::FinishedEpoch(2), +// SendInteraction::EnqueuedInput(2), +// SendInteraction::EnqueuedInput(3), +// SendInteraction::EnqueuedInput(4), +// SendInteraction::EnqueuedInput(5), +// SendInteraction::FinishedEpoch(6), +// SendInteraction::EnqueuedInput(6), +// SendInteraction::EnqueuedInput(7), +// ]; +// test_process_input(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// // -------------------------------------------------------------------------------------------- +// // process_inputs +// // -------------------------------------------------------------------------------------------- +// +// async fn test_process_inputs( +// rollup_status: RollupStatus, +// input_timestamps: Vec, +// expected: Vec, +// ) { +// let broker = mock::Broker::new(vec![rollup_status], Vec::new()); +// let mut context = Context::new( +// 0, +// 5, +// DAppMetadata::default(), +// DispatcherMetrics::default(), +// rollup_status, +// ); +// let machine_driver = MachineDriver::new(H160::random()); +// let dapp_input_box = types::foldables::DAppInputBox { +// inputs: input_timestamps +// .iter() +// .map(|timestamp| Arc::new(mock::new_input(*timestamp))) +// .collect::>() +// .into(), +// }; +// let result = machine_driver +// .process_inputs(&mut context, &dapp_input_box, &broker) +// .await; +// assert!(result.is_ok()); +// +// broker.assert_send_interactions(expected); +// } +// +// #[tokio::test] +// async fn test_process_inputs_without_skipping() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![1, 2, 3, 4]; +// let send_interactions = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::EnqueuedInput(1), +// SendInteraction::EnqueuedInput(2), +// SendInteraction::EnqueuedInput(3), +// ]; +// test_process_inputs(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn process_inputs_with_some_skipping() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 3, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![1, 2, 3, 4]; +// let send_interactions = vec![SendInteraction::EnqueuedInput(3)]; +// test_process_inputs(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn process_inputs_skipping_all() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 4, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![1, 2, 3, 4]; +// let send_interactions = vec![]; +// test_process_inputs(rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// // -------------------------------------------------------------------------------------------- +// // react +// // -------------------------------------------------------------------------------------------- +// +// async fn test_react( +// block: Block, +// rollup_status: RollupStatus, +// input_timestamps: Vec, +// expected: Vec, +// ) { +// let broker = mock::Broker::new(vec![rollup_status], Vec::new()); +// let mut context = Context::new( +// 1716392210, +// 86400, +// DAppMetadata::default(), +// DispatcherMetrics::default(), +// rollup_status, +// ); +// +// let dapp_address = H160::random(); +// let machine_driver = MachineDriver::new(dapp_address); +// +// let input_box = mock::new_input_box(); +// let input_box = +// mock::update_input_box(input_box, dapp_address, input_timestamps); +// +// let result = machine_driver +// .react(&mut context, &block, &input_box, &broker) +// .await; +// assert!(result.is_ok()); +// +// broker.assert_send_interactions(expected); +// } +// +// #[tokio::test] +// async fn react_without_finish_epoch() { +// let block = mock::new_block(3); +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![1, 2]; +// let send_interactions = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::EnqueuedInput(1), +// ]; +// test_react(block, rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn react_with_finish_epoch() { +// let block = mock::new_block(5); +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![1, 2]; +// let send_interactions = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::EnqueuedInput(1), +// SendInteraction::FinishedEpoch(2), +// ]; +// test_react(block, rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn react_with_internal_finish_epoch() { +// let block = mock::new_block(5); +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![4, 5]; +// let send_interactions = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::FinishedEpoch(1), +// SendInteraction::EnqueuedInput(1), +// ]; +// test_react(block, rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// async fn react_without_inputs() { +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let broker = mock::Broker::new(vec![rollup_status], Vec::new()); +// let mut context = Context::new( +// 0, +// 5, +// DAppMetadata::default(), +// DispatcherMetrics::default(), +// rollup_status, +// ); +// let block = mock::new_block(5); +// let input_box = mock::new_input_box(); +// let machine_driver = MachineDriver::new(H160::random()); +// let result = machine_driver +// .react(&mut context, &block, &input_box, &broker) +// .await; +// assert!(result.is_ok()); +// broker.assert_send_interactions(vec![]); +// } +// +// #[tokio::test] +// async fn react_with_inputs_after_first_epoch_length() { +// let block = mock::new_block(5); +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![7, 8]; +// let send_interactions = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::FinishedEpoch(1), +// SendInteraction::EnqueuedInput(1), +// ]; +// test_react(block, rollup_status, input_timestamps, send_interactions) +// .await; +// } +// +// #[tokio::test] +// #[serial] +// async fn react_bug_buster_original() { +// let block = mock::new_block(1716774006); +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![ +// 1716495424, // +// 1716514994, // +// 1716550722, // +// 1716551814, // +// 1716552408, // +// 1716558302, // +// 1716558322, // +// 1716564194, // +// 1716564306, // +// 1716564696, // +// 1716568314, // +// 1716568652, // +// 1716569100, // +// 1716569136, // +// 1716578858, // +// 1716578948, // +// ]; +// let expected = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::FinishedEpoch(1), +// SendInteraction::EnqueuedInput(1), +// SendInteraction::EnqueuedInput(2), +// SendInteraction::EnqueuedInput(3), +// SendInteraction::EnqueuedInput(4), +// SendInteraction::EnqueuedInput(5), +// SendInteraction::EnqueuedInput(6), +// SendInteraction::EnqueuedInput(7), +// SendInteraction::EnqueuedInput(8), +// SendInteraction::EnqueuedInput(9), +// SendInteraction::FinishedEpoch(10), +// SendInteraction::EnqueuedInput(10), +// SendInteraction::EnqueuedInput(11), +// SendInteraction::EnqueuedInput(12), +// SendInteraction::EnqueuedInput(13), +// SendInteraction::EnqueuedInput(14), +// SendInteraction::EnqueuedInput(15), +// SendInteraction::FinishedEpoch(16), +// ]; +// test_react(block, rollup_status, input_timestamps, expected).await; +// +// let block2 = mock::new_block(1716858268); +// let input_timestamps2 = vec![1716858268]; +// let expected2 = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::FinishedEpoch(1), +// ]; +// test_react(block2, rollup_status, input_timestamps2, expected2).await +// } +// +// #[tokio::test] +// #[serial] +// async fn react_bug_buster_reconstruction() { +// let block = mock::new_block(1716859860); +// let rollup_status = RollupStatus { +// last_inputs_sent: 0, +// last_event_is_finish_epoch: false, +// }; +// let input_timestamps = vec![ +// 1716495424, // +// 1716514994, // +// 1716550722, // +// 1716551814, // +// 1716552408, // +// 1716558302, // +// 1716558322, // +// 1716564194, // +// 1716564306, // +// 1716564696, // +// 1716568314, // +// 1716568652, // +// 1716569100, // +// 1716569136, // +// 1716578858, // +// 1716578948, // +// 1716858268, // extra +// 1716858428, // extra +// 1716859860, // extra +// ]; +// let expected = vec![ +// SendInteraction::EnqueuedInput(0), +// SendInteraction::FinishedEpoch(1), +// SendInteraction::EnqueuedInput(1), +// SendInteraction::EnqueuedInput(2), +// SendInteraction::EnqueuedInput(3), +// SendInteraction::EnqueuedInput(4), +// SendInteraction::EnqueuedInput(5), +// SendInteraction::EnqueuedInput(6), +// SendInteraction::EnqueuedInput(7), +// SendInteraction::EnqueuedInput(8), +// SendInteraction::EnqueuedInput(9), +// SendInteraction::FinishedEpoch(10), +// SendInteraction::EnqueuedInput(10), +// SendInteraction::EnqueuedInput(11), +// SendInteraction::EnqueuedInput(12), +// SendInteraction::EnqueuedInput(13), +// SendInteraction::EnqueuedInput(14), +// SendInteraction::EnqueuedInput(15), +// SendInteraction::FinishedEpoch(16), +// SendInteraction::EnqueuedInput(16), +// SendInteraction::EnqueuedInput(17), +// SendInteraction::EnqueuedInput(18), +// ]; +// test_react(block, rollup_status, input_timestamps, expected).await; +// } +// } diff --git a/offchain/dispatcher/src/drivers/mock.rs b/offchain/dispatcher/src/drivers/mock.rs index 3197e1683..65945c1d9 100644 --- a/offchain/dispatcher/src/drivers/mock.rs +++ b/offchain/dispatcher/src/drivers/mock.rs @@ -3,10 +3,9 @@ use async_trait::async_trait; use eth_state_fold_types::{ - ethereum_types::{Address, Bloom, H160, H256}, + ethereum_types::{Bloom, H160, H256}, Block, }; -use im::{hashmap, Vector}; use rollups_events::RollupsClaim; use snafu::whatever; use std::{ @@ -14,7 +13,7 @@ use std::{ ops::{Deref, DerefMut}, sync::{Arc, Mutex}, }; -use types::foldables::{DAppInputBox, Input, InputBox}; +use types::foldables::Input; use crate::machine::{ rollups_broker::BrokerFacadeError, BrokerSend, BrokerStatus, RollupStatus, @@ -24,69 +23,69 @@ use crate::machine::{ // auxiliary functions // ------------------------------------------------------------------------------------------------ -pub fn new_block(timestamp: u32) -> Block { +pub fn new_block(number: u32) -> Block { Block { hash: H256::random(), - number: 0.into(), + number: number.into(), parent_hash: H256::random(), - timestamp: timestamp.into(), + timestamp: 0.into(), logs_bloom: Bloom::default(), } } -pub fn new_input(timestamp: u32) -> Input { +pub fn new_input(block: u32) -> Input { Input { sender: Arc::new(H160::random()), payload: vec![], - block_added: Arc::new(new_block(timestamp)), + block_added: Arc::new(new_block(block)), dapp: Arc::new(H160::random()), tx_hash: Arc::new(H256::default()), } } -pub fn new_input_box() -> InputBox { - InputBox { - dapp_address: Arc::new(H160::random()), - input_box_address: Arc::new(H160::random()), - dapp_input_boxes: Arc::new(hashmap! {}), - } -} - -pub fn update_input_box( - input_box: InputBox, - dapp_address: Address, - timestamps: Vec, -) -> InputBox { - let inputs = timestamps - .iter() - .map(|timestamp| Arc::new(new_input(*timestamp))) - .collect::>(); - let inputs = Vector::from(inputs); - let dapp_input_boxes = input_box - .dapp_input_boxes - .update(Arc::new(dapp_address), Arc::new(DAppInputBox { inputs })); - InputBox { - dapp_address: Arc::new(dapp_address), - input_box_address: input_box.input_box_address, - dapp_input_boxes: Arc::new(dapp_input_boxes), - } -} +// pub fn new_input_box() -> InputBox { +// InputBox { +// dapp_address: Arc::new(H160::random()), +// input_box_address: Arc::new(H160::random()), +// dapp_input_boxes: Arc::new(hashmap! {}), +// } +// } +// +// pub fn update_input_box( +// input_box: InputBox, +// dapp_address: Address, +// timestamps: Vec, +// ) -> InputBox { +// let inputs = timestamps +// .iter() +// .map(|timestamp| Arc::new(new_input(*timestamp))) +// .collect::>(); +// let inputs = Vector::from(inputs); +// let dapp_input_boxes = input_box +// .dapp_input_boxes +// .update(Arc::new(dapp_address), Arc::new(DAppInputBox { inputs })); +// InputBox { +// dapp_address: Arc::new(dapp_address), +// input_box_address: input_box.input_box_address, +// dapp_input_boxes: Arc::new(dapp_input_boxes), +// } +// } // ------------------------------------------------------------------------------------------------ // Broker // ------------------------------------------------------------------------------------------------ #[derive(Debug, Clone, Copy, PartialEq)] -pub enum SendInteraction { - EnqueuedInput(u64), - FinishedEpoch(u64), +pub enum Sent { + Input(u64), // input index + Finish, } #[derive(Debug)] pub struct Broker { pub rollup_statuses: Mutex>, pub next_claims: Mutex>, - pub send_interactions: Mutex>, + pub send_interactions: Mutex>, status_error: bool, enqueue_input_error: bool, finish_epoch_error: bool, @@ -131,12 +130,12 @@ impl Broker { mutex_guard.deref().len() } - fn get_send_interaction(&self, i: usize) -> SendInteraction { + fn get_send_interaction(&self, i: usize) -> Sent { let mutex_guard = self.send_interactions.lock().unwrap(); mutex_guard.deref().get(i).unwrap().clone() } - pub fn assert_send_interactions(&self, expected: Vec) { + pub fn assert_send_interactions(&self, expected: Vec) { assert_eq!( self.send_interactions_len(), expected.len(), @@ -179,24 +178,17 @@ impl BrokerSend for Broker { whatever!("enqueue_input error") } else { let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::EnqueuedInput(input_index)); + mutex_guard.deref_mut().push(Sent::Input(input_index)); Ok(()) } } - async fn finish_epoch( - &self, - inputs_sent_count: u64, - ) -> Result<(), BrokerFacadeError> { + async fn finish_epoch(&self, _: u64) -> Result<(), BrokerFacadeError> { if self.finish_epoch_error { whatever!("finish_epoch error") } else { let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::FinishedEpoch(inputs_sent_count)); + mutex_guard.deref_mut().push(Sent::Finish); Ok(()) } } diff --git a/offchain/dispatcher/src/machine/mod.rs b/offchain/dispatcher/src/machine/mod.rs index 735fdff23..59a54c131 100644 --- a/offchain/dispatcher/src/machine/mod.rs +++ b/offchain/dispatcher/src/machine/mod.rs @@ -11,8 +11,7 @@ use self::rollups_broker::BrokerFacadeError; #[derive(Debug, Clone, Copy, Default)] pub struct RollupStatus { - pub inputs_sent_count: u64, - pub last_event_is_finish_epoch: bool, + pub number_of_inputs_sent: u64, } #[async_trait] diff --git a/offchain/dispatcher/src/machine/rollups_broker.rs b/offchain/dispatcher/src/machine/rollups_broker.rs index e430e4fe7..bdd9d2fac 100644 --- a/offchain/dispatcher/src/machine/rollups_broker.rs +++ b/offchain/dispatcher/src/machine/rollups_broker.rs @@ -192,13 +192,11 @@ impl From for RollupStatus { match payload.data { RollupsData::AdvanceStateInput { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: false, + number_of_inputs_sent: inputs_sent_count, }, RollupsData::FinishEpoch { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: true, + number_of_inputs_sent: inputs_sent_count, }, } } @@ -249,7 +247,7 @@ fn build_next_input( block_number: input.block_added.number.as_u64(), timestamp: input.block_added.timestamp.as_u64(), epoch_index: 0, - input_index: status.status.inputs_sent_count, + input_index: status.status.number_of_inputs_sent, }; let data = RollupsData::AdvanceStateInput(RollupsAdvanceStateInput { @@ -261,7 +259,7 @@ fn build_next_input( RollupsInput { parent_id: status.id.clone(), epoch_index: status.epoch_number, - inputs_sent_count: status.status.inputs_sent_count + 1, + inputs_sent_count: status.status.number_of_inputs_sent + 1, data, } } @@ -270,261 +268,261 @@ fn build_next_finish_epoch(status: &BrokerStreamStatus) -> RollupsInput { RollupsInput { parent_id: status.id.clone(), epoch_index: status.epoch_number, - inputs_sent_count: status.status.inputs_sent_count, + inputs_sent_count: status.status.number_of_inputs_sent, data: RollupsData::FinishEpoch {}, } } -#[cfg(test)] -mod broker_facade_tests { - use std::{sync::Arc, time::Duration}; - - use backoff::ExponentialBackoffBuilder; - use eth_state_fold_types::{ - ethereum_types::{Bloom, H160, H256, U256, U64}, - Block, - }; - use rollups_events::{ - BrokerConfig, BrokerEndpoint, DAppMetadata, Hash, InputMetadata, - Payload, RedactedUrl, RollupsAdvanceStateInput, RollupsData, Url, - }; - use test_fixtures::broker::BrokerFixture; - use testcontainers::clients::Cli; - use types::foldables::Input; - - use crate::machine::{ - rollups_broker::BrokerFacadeError, BrokerSend, BrokerStatus, - }; - - use super::BrokerFacade; - - // -------------------------------------------------------------------------------------------- - // new - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn new_ok() { - let docker = Cli::default(); - let (_fixture, _broker) = setup(&docker).await; - } - - #[tokio::test] - async fn new_error() { - let docker = Cli::default(); - let error = failable_setup(&docker, true) - .await - .err() - .expect("'status' function has not failed") - .to_string(); - // BrokerFacadeError::BrokerConnectionError - assert_eq!(error, "error connecting to the broker"); - } - - // -------------------------------------------------------------------------------------------- - // status - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn status_inputs_sent_count_equals_0() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 0); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_sent_count_equals_1() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 1).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 1); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_sent_count_equals_10() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 10).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 10); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_is_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 0); - assert!(status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_with_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 5).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 5); - assert!(status.last_event_is_finish_epoch); - } - - // -------------------------------------------------------------------------------------------- - // enqueue_input - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn enqueue_input_ok() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - for i in 0..3 { - assert!(broker - .enqueue_input(i, &new_enqueue_input()) - .await - .is_ok()); - } - } - - #[tokio::test] - #[should_panic( - expected = "assertion `left == right` failed\n left: 1\n right: 6" - )] - async fn enqueue_input_assertion_error_1() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - let _ = broker.enqueue_input(5, &new_enqueue_input()).await; - } - - #[tokio::test] - #[should_panic( - expected = "assertion `left == right` failed\n left: 5\n right: 6" - )] - async fn enqueue_input_assertion_error_2() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - for i in 0..4 { - assert!(broker - .enqueue_input(i, &new_enqueue_input()) - .await - .is_ok()); - } - let _ = broker.enqueue_input(5, &new_enqueue_input()).await; - } - - // NOTE: cannot test result error because the dependency is not injectable. - - // -------------------------------------------------------------------------------------------- - // finish_epoch - // -------------------------------------------------------------------------------------------- - - #[tokio::test] - async fn finish_epoch_ok_1() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - assert!(broker.finish_epoch(0).await.is_ok()); - // BONUS TEST: testing for a finished epoch with no inputs - assert!(broker.finish_epoch(0).await.is_ok()); - } - - #[tokio::test] - async fn finish_epoch_ok_2() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - let first_epoch_inputs = 3; - produce_advance_state_inputs(&fixture, first_epoch_inputs).await; - produce_finish_epoch_input(&fixture).await; - let second_epoch_inputs = 7; - produce_advance_state_inputs(&fixture, second_epoch_inputs).await; - let total_inputs = first_epoch_inputs + second_epoch_inputs; - assert!(broker.finish_epoch(total_inputs as u64).await.is_ok()); - } - - #[tokio::test] - #[should_panic( - expected = "assertion `left == right` failed\n left: 0\n right: 1" - )] - async fn finish_epoch_assertion_error() { - let docker = Cli::default(); - let (_fixture, broker) = setup(&docker).await; - let _ = broker.finish_epoch(1).await; - } - - // NOTE: cannot test result error because the dependency is not injectable. - - // -------------------------------------------------------------------------------------------- - // auxiliary - // -------------------------------------------------------------------------------------------- - - async fn failable_setup( - docker: &Cli, - should_fail: bool, - ) -> Result<(BrokerFixture, BrokerFacade), BrokerFacadeError> { - let fixture = BrokerFixture::setup(docker).await; - let redis_endpoint = if should_fail { - BrokerEndpoint::Single(RedactedUrl::new( - Url::parse("https://invalid.com").unwrap(), - )) - } else { - fixture.redis_endpoint().clone() - }; - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let metadata = DAppMetadata { - chain_id: fixture.chain_id(), - dapp_address: fixture.dapp_address().clone(), - }; - let broker = BrokerFacade::new(config, metadata).await?; - Ok((fixture, broker)) - } - - async fn setup(docker: &Cli) -> (BrokerFixture, BrokerFacade) { - failable_setup(docker, false).await.unwrap() - } - - fn new_enqueue_input() -> Input { - Input { - sender: Arc::new(H160::random()), - payload: vec![], - block_added: Arc::new(Block { - hash: H256::random(), - number: U64::zero(), - parent_hash: H256::random(), - timestamp: U256::zero(), - logs_bloom: Bloom::default(), - }), - dapp: Arc::new(H160::random()), - tx_hash: Arc::new(H256::random()), - } - } - - async fn produce_advance_state_inputs(fixture: &BrokerFixture<'_>, n: u32) { - for _ in 0..n { - let _ = fixture - .produce_input_event(RollupsData::AdvanceStateInput( - RollupsAdvanceStateInput { - metadata: InputMetadata::default(), - payload: Payload::default(), - tx_hash: Hash::default(), - }, - )) - .await; - } - } - - async fn produce_finish_epoch_input(fixture: &BrokerFixture<'_>) { - let _ = fixture - .produce_input_event(RollupsData::FinishEpoch {}) - .await; - } -} +// #[cfg(test)] +// mod broker_facade_tests { +// use std::{sync::Arc, time::Duration}; +// +// use backoff::ExponentialBackoffBuilder; +// use eth_state_fold_types::{ +// ethereum_types::{Bloom, H160, H256, U256, U64}, +// Block, +// }; +// use rollups_events::{ +// BrokerConfig, BrokerEndpoint, DAppMetadata, Hash, InputMetadata, +// Payload, RedactedUrl, RollupsAdvanceStateInput, RollupsData, Url, +// }; +// use test_fixtures::broker::BrokerFixture; +// use testcontainers::clients::Cli; +// use types::foldables::Input; +// +// use crate::machine::{ +// rollups_broker::BrokerFacadeError, BrokerSend, BrokerStatus, +// }; +// +// use super::BrokerFacade; +// +// // -------------------------------------------------------------------------------------------- +// // new +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn new_ok() { +// let docker = Cli::default(); +// let (_fixture, _broker) = setup(&docker).await; +// } +// +// #[tokio::test] +// async fn new_error() { +// let docker = Cli::default(); +// let error = failable_setup(&docker, true) +// .await +// .err() +// .expect("'status' function has not failed") +// .to_string(); +// // BrokerFacadeError::BrokerConnectionError +// assert_eq!(error, "error connecting to the broker"); +// } +// +// // -------------------------------------------------------------------------------------------- +// // status +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn status_inputs_sent_count_equals_0() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 0); +// assert!(!status.last_event_is_finish_epoch); +// } +// +// #[tokio::test] +// async fn status_inputs_sent_count_equals_1() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_advance_state_inputs(&fixture, 1).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 1); +// assert!(!status.last_event_is_finish_epoch); +// } +// +// #[tokio::test] +// async fn status_inputs_sent_count_equals_10() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_advance_state_inputs(&fixture, 10).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 10); +// assert!(!status.number_of_inputs_sent); +// } +// +// #[tokio::test] +// async fn status_is_finish_epoch() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_finish_epoch_input(&fixture).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.number_of_inputs_sent, 0); +// assert!(status.last_event_is_finish_epoch); +// } +// +// #[tokio::test] +// async fn status_inputs_with_finish_epoch() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// produce_advance_state_inputs(&fixture, 5).await; +// produce_finish_epoch_input(&fixture).await; +// let status = broker.status().await.expect("'status' function failed"); +// assert_eq!(status.last_inputs_sent, 5); +// assert!(status.last_event_is_finish_epoch); +// } +// +// // -------------------------------------------------------------------------------------------- +// // enqueue_input +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn enqueue_input_ok() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// for i in 0..3 { +// assert!(broker +// .enqueue_input(i, &new_enqueue_input()) +// .await +// .is_ok()); +// } +// } +// +// #[tokio::test] +// #[should_panic( +// expected = "assertion `left == right` failed\n left: 1\n right: 6" +// )] +// async fn enqueue_input_assertion_error_1() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// let _ = broker.enqueue_input(5, &new_enqueue_input()).await; +// } +// +// #[tokio::test] +// #[should_panic( +// expected = "assertion `left == right` failed\n left: 5\n right: 6" +// )] +// async fn enqueue_input_assertion_error_2() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// for i in 0..4 { +// assert!(broker +// .enqueue_input(i, &new_enqueue_input()) +// .await +// .is_ok()); +// } +// let _ = broker.enqueue_input(5, &new_enqueue_input()).await; +// } +// +// // NOTE: cannot test result error because the dependency is not injectable. +// +// // -------------------------------------------------------------------------------------------- +// // finish_epoch +// // -------------------------------------------------------------------------------------------- +// +// #[tokio::test] +// async fn finish_epoch_ok_1() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// assert!(broker.finish_epoch(0).await.is_ok()); +// // BONUS TEST: testing for a finished epoch with no inputs +// assert!(broker.finish_epoch(0).await.is_ok()); +// } +// +// #[tokio::test] +// async fn finish_epoch_ok_2() { +// let docker = Cli::default(); +// let (fixture, broker) = setup(&docker).await; +// let first_epoch_inputs = 3; +// produce_advance_state_inputs(&fixture, first_epoch_inputs).await; +// produce_finish_epoch_input(&fixture).await; +// let second_epoch_inputs = 7; +// produce_advance_state_inputs(&fixture, second_epoch_inputs).await; +// let total_inputs = first_epoch_inputs + second_epoch_inputs; +// assert!(broker.finish_epoch(total_inputs as u64).await.is_ok()); +// } +// +// #[tokio::test] +// #[should_panic( +// expected = "assertion `left == right` failed\n left: 0\n right: 1" +// )] +// async fn finish_epoch_assertion_error() { +// let docker = Cli::default(); +// let (_fixture, broker) = setup(&docker).await; +// let _ = broker.finish_epoch(1).await; +// } +// +// // NOTE: cannot test result error because the dependency is not injectable. +// +// // -------------------------------------------------------------------------------------------- +// // auxiliary +// // -------------------------------------------------------------------------------------------- +// +// async fn failable_setup( +// docker: &Cli, +// should_fail: bool, +// ) -> Result<(BrokerFixture, BrokerFacade), BrokerFacadeError> { +// let fixture = BrokerFixture::setup(docker).await; +// let redis_endpoint = if should_fail { +// BrokerEndpoint::Single(RedactedUrl::new( +// Url::parse("https://invalid.com").unwrap(), +// )) +// } else { +// fixture.redis_endpoint().clone() +// }; +// let config = BrokerConfig { +// redis_endpoint, +// consume_timeout: 300000, +// backoff: ExponentialBackoffBuilder::new() +// .with_initial_interval(Duration::from_millis(1000)) +// .with_max_elapsed_time(Some(Duration::from_millis(3000))) +// .build(), +// }; +// let metadata = DAppMetadata { +// chain_id: fixture.chain_id(), +// dapp_address: fixture.dapp_address().clone(), +// }; +// let broker = BrokerFacade::new(config, metadata).await?; +// Ok((fixture, broker)) +// } +// +// async fn setup(docker: &Cli) -> (BrokerFixture, BrokerFacade) { +// failable_setup(docker, false).await.unwrap() +// } +// +// fn new_enqueue_input() -> Input { +// Input { +// sender: Arc::new(H160::random()), +// payload: vec![], +// block_added: Arc::new(Block { +// hash: H256::random(), +// number: U64::zero(), +// parent_hash: H256::random(), +// timestamp: U256::zero(), +// logs_bloom: Bloom::default(), +// }), +// dapp: Arc::new(H160::random()), +// tx_hash: Arc::new(H256::random()), +// } +// } +// +// async fn produce_advance_state_inputs(fixture: &BrokerFixture<'_>, n: u32) { +// for _ in 0..n { +// let _ = fixture +// .produce_input_event(RollupsData::AdvanceStateInput( +// RollupsAdvanceStateInput { +// metadata: InputMetadata::default(), +// payload: Payload::default(), +// tx_hash: Hash::default(), +// }, +// )) +// .await; +// } +// } +// +// async fn produce_finish_epoch_input(fixture: &BrokerFixture<'_>) { +// let _ = fixture +// .produce_input_event(RollupsData::FinishEpoch {}) +// .await; +// } +// } diff --git a/offchain/dispatcher/src/setup.rs b/offchain/dispatcher/src/setup.rs index 97f632168..872865aa1 100644 --- a/offchain/dispatcher/src/setup.rs +++ b/offchain/dispatcher/src/setup.rs @@ -82,11 +82,11 @@ pub async fn create_context( ) -> Result { let dapp_deployment_block_number = U64::from(config.blockchain_config.dapp_deployment_block_number); - let genesis_timestamp: u64 = block_server + let genesis_block = block_server .query_block(dapp_deployment_block_number) .await .context(StateServerSnafu)? - .timestamp + .number .as_u64(); let epoch_length = config.epoch_duration; @@ -94,14 +94,16 @@ pub async fn create_context( // The dispatcher doesn't work properly if there are inputs in the broker from a previous run. // Hence, we make sure that the broker is in a clean state before starting. - ensure!(status.inputs_sent_count == 0, DirtyBrokerSnafu); + ensure!(status.number_of_inputs_sent == 0, DirtyBrokerSnafu); let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, dapp_metadata, metrics, - status, + 0, + None, + None, ); Ok(context)