diff --git a/crates/fuel-core/src/service/sub_services/algorithm_updater.rs b/crates/fuel-core/src/service/sub_services/algorithm_updater.rs index 38a78764f53..002ecebe039 100644 --- a/crates/fuel-core/src/service/sub_services/algorithm_updater.rs +++ b/crates/fuel-core/src/service/sub_services/algorithm_updater.rs @@ -17,8 +17,8 @@ use fuel_core_gas_price_service::{ fuel_gas_price_updater::{ da_source_adapter::{ dummy_costs::DummyDaBlockCosts, - service::DaBlockCostsService, DaBlockCostsProvider, + DaBlockCostsSharedState, }, fuel_core_storage_adapter::{ storage::GasPriceMetadata, @@ -39,6 +39,7 @@ use fuel_core_gas_price_service::{ use fuel_core_services::{ stream::BoxStream, RunnableService, + Service, StateWatcher, }; use fuel_core_storage::{ @@ -62,7 +63,7 @@ use fuel_core_types::{ type Updater = FuelGasPriceUpdater< FuelL2BlockSource, MetadataStorageAdapter, - DaBlockCostsProvider, + DaBlockCostsSharedState, >; pub struct InitializeTask { @@ -73,7 +74,7 @@ pub struct InitializeTask { pub on_chain_db: Database>, pub block_stream: BoxStream, pub shared_algo: SharedGasPriceAlgo, - pub da_block_costs_service: DaBlockCostsService, + pub da_block_costs_provider: DaBlockCostsProvider, } type MetadataStorageAdapter = @@ -100,8 +101,8 @@ impl InitializeTask { // there's no use of this source yet, so we can safely return an error let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("Not used"))); - let da_block_costs_service = - DaBlockCostsService::new(da_block_costs_source, None); + let da_block_costs_provider = + DaBlockCostsProvider::new(da_block_costs_source, None); let task = Self { config, @@ -111,7 +112,7 @@ impl InitializeTask { on_chain_db, block_stream, shared_algo, - da_block_costs_service, + da_block_costs_provider, }; Ok(task) } @@ -173,15 +174,15 @@ impl RunnableService for InitializeTask { self.gas_price_db, self.on_chain_db, self.block_stream, - self.da_block_costs_service.shared_data(), + self.da_block_costs_provider.shared_state, )?; - let inner_service = GasPriceService::new( - starting_height, - updater, - self.shared_algo, - self.da_block_costs_service, - ) - .await; + + self.da_block_costs_provider + .service + .start_and_await() + .await?; + let inner_service = + GasPriceService::new(starting_height, updater, self.shared_algo).await; Ok(inner_service) } } @@ -193,7 +194,7 @@ pub fn get_synced_gas_price_updater( mut gas_price_db: Database>, on_chain_db: Database>, block_stream: BoxStream, - da_block_costs: DaBlockCostsProvider, + da_block_costs: DaBlockCostsSharedState, ) -> anyhow::Result { let mut first_run = false; let latest_block_height: u32 = on_chain_db diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater.rs index cf69e897e46..27ed564abe6 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater.rs @@ -121,7 +121,7 @@ pub struct DaBlockCosts { pub blob_cost_wei: u128, } -pub trait GetDaBlockCosts: Send + Sync + Clone { +pub trait GetDaBlockCosts: Send + Sync { fn get(&mut self) -> Result>; } diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs index 80f1783312a..29fb841fd7e 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter.rs @@ -1,9 +1,18 @@ use crate::fuel_gas_price_updater::{ + da_source_adapter::service::{ + new_service, + DaBlockCostsService, + DaBlockCostsSource, + }, DaBlockCosts, GetDaBlockCosts, Result as GasPriceUpdaterResult, }; -use std::sync::Arc; +use fuel_core_services::ServiceRunner; +use std::{ + sync::Arc, + time::Duration, +}; use tokio::sync::{ mpsc, Mutex, @@ -16,21 +25,42 @@ pub mod service; pub const POLLING_INTERVAL_MS: u64 = 10_000; #[derive(Clone)] -pub struct DaBlockCostsProvider { - receiver: Arc>>, +pub struct DaBlockCostsSharedState { + inner: Arc>>, } -impl DaBlockCostsProvider { - fn from_receiver(receiver: mpsc::Receiver) -> Self { +impl DaBlockCostsSharedState { + fn new(receiver: mpsc::Receiver) -> Self { + Self { + inner: Arc::new(Mutex::new(receiver)), + } + } +} + +pub struct DaBlockCostsProvider { + pub service: ServiceRunner>, + pub shared_state: DaBlockCostsSharedState, +} + +const CHANNEL_BUFFER_SIZE: usize = 10; + +impl DaBlockCostsProvider +where + T: DaBlockCostsSource, +{ + pub fn new(source: T, polling_interval: Option) -> Self { + let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE); + let service = new_service(source, sender, polling_interval); Self { - receiver: Arc::new(Mutex::new(receiver)), + shared_state: DaBlockCostsSharedState::new(receiver), + service, } } } -impl GetDaBlockCosts for DaBlockCostsProvider { +impl GetDaBlockCosts for DaBlockCostsSharedState { fn get(&mut self) -> GasPriceUpdaterResult> { - if let Ok(mut guard) = self.receiver.try_lock() { + if let Ok(mut guard) = self.inner.try_lock() { if let Ok(da_block_costs) = guard.try_recv() { return Ok(Some(da_block_costs)); } @@ -43,10 +73,7 @@ impl GetDaBlockCosts for DaBlockCostsProvider { #[cfg(test)] mod tests { use super::*; - use crate::fuel_gas_price_updater::da_source_adapter::{ - dummy_costs::DummyDaBlockCosts, - service::new_service, - }; + use crate::fuel_gas_price_updater::da_source_adapter::dummy_costs::DummyDaBlockCosts; use fuel_core_services::Service; use std::time::Duration; use tokio::time::sleep; @@ -60,13 +87,16 @@ mod tests { blob_cost_wei: 2, }; let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); - let mut shared_state = service.shared.clone(); + let provider = DaBlockCostsProvider::new( + da_block_costs_source, + Some(Duration::from_millis(1)), + ); + let mut shared_state = provider.shared_state.clone(); // when - service.start_and_await().await.unwrap(); + provider.service.start_and_await().await.unwrap(); sleep(Duration::from_millis(10)).await; - service.stop_and_await().await.unwrap(); + provider.service.stop_and_await().await.unwrap(); // then let da_block_costs_opt = shared_state.get().unwrap(); @@ -83,13 +113,16 @@ mod tests { blob_cost_wei: 1, }; let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone())); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); - let mut shared_state = service.shared.clone(); + let provider = DaBlockCostsProvider::new( + da_block_costs_source, + Some(Duration::from_millis(1)), + ); + let mut shared_state = provider.shared_state.clone(); // when - service.start_and_await().await.unwrap(); + provider.service.start_and_await().await.unwrap(); sleep(Duration::from_millis(10)).await; - service.stop_and_await().await.unwrap(); + provider.service.stop_and_await().await.unwrap(); let da_block_costs_opt = shared_state.get().unwrap(); assert!(da_block_costs_opt.is_some()); assert_eq!(da_block_costs_opt.unwrap(), expected_da_cost); @@ -103,13 +136,16 @@ mod tests { async fn run__when_da_block_cost_source_errors_shared_value_is_not_updated() { // given let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!"))); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); - let mut shared_state = service.shared.clone(); + let provider = DaBlockCostsProvider::new( + da_block_costs_source, + Some(Duration::from_millis(1)), + ); + let mut shared_state = provider.shared_state.clone(); // when - service.start_and_await().await.unwrap(); + provider.service.start_and_await().await.unwrap(); sleep(Duration::from_millis(10)).await; - service.stop_and_await().await.unwrap(); + provider.service.stop_and_await().await.unwrap(); // then let da_block_costs_opt = shared_state.get().unwrap(); diff --git a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs index 421ac53b0dd..23fc9866120 100644 --- a/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs +++ b/crates/services/gas_price_service/src/fuel_gas_price_updater/da_source_adapter/service.rs @@ -1,8 +1,5 @@ use crate::fuel_gas_price_updater::{ - da_source_adapter::{ - DaBlockCostsProvider, - POLLING_INTERVAL_MS, - }, + da_source_adapter::POLLING_INTERVAL_MS, DaBlockCosts, }; use fuel_core_services::{ @@ -25,15 +22,12 @@ use tokio::{ pub use anyhow::Result; -const CHANNEL_BUFFER_SIZE: usize = 10; - /// This struct houses the shared_state, polling interval /// and a source, which does the actual fetching of the data pub struct DaBlockCostsService where Source: DaBlockCostsSource, { - block_cost_provider: DaBlockCostsProvider, poll_interval: Interval, source: Source, sender: Sender, @@ -44,13 +38,14 @@ impl DaBlockCostsService where Source: DaBlockCostsSource, { - pub fn new(source: Source, poll_interval: Option) -> Self { + pub fn new( + source: Source, + sender: Sender, + poll_interval: Option, + ) -> Self { #[allow(clippy::arithmetic_side_effects)] - let (sender, receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFFER_SIZE); - let block_cost_provider = DaBlockCostsProvider::from_receiver(receiver); Self { sender, - block_cost_provider, poll_interval: interval( poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)), ), @@ -74,15 +69,13 @@ where { const NAME: &'static str = "DaBlockCostsService"; - type SharedData = DaBlockCostsProvider; + type SharedData = (); type Task = Self; type TaskParams = (); - fn shared_data(&self) -> Self::SharedData { - self.block_cost_provider.clone() - } + fn shared_data(&self) -> Self::SharedData {} async fn into_task( mut self, @@ -100,7 +93,7 @@ where Source: DaBlockCostsSource, { /// This function polls the source according to a polling interval - /// described by the DaSourceService + /// described by the DaBlockCostsService async fn run(&mut self, state_watcher: &mut StateWatcher) -> Result { let continue_running; @@ -130,7 +123,8 @@ where pub fn new_service( da_source: S, + sender: Sender, poll_interval: Option, ) -> ServiceRunner> { - ServiceRunner::new(DaBlockCostsService::new(da_source, poll_interval)) + ServiceRunner::new(DaBlockCostsService::new(da_source, sender, poll_interval)) } diff --git a/crates/services/gas_price_service/src/lib.rs b/crates/services/gas_price_service/src/lib.rs index 15ee0725119..10caf66ec54 100644 --- a/crates/services/gas_price_service/src/lib.rs +++ b/crates/services/gas_price_service/src/lib.rs @@ -7,18 +7,12 @@ use async_trait::async_trait; use fuel_core_services::{ RunnableService, RunnableTask, - Service, - ServiceRunner, StateWatcher, }; use fuel_core_types::fuel_types::BlockHeight; use futures::FutureExt; use std::sync::Arc; -use crate::fuel_gas_price_updater::da_source_adapter::{ - dummy_costs::DummyDaBlockCosts, - service::DaBlockCostsService, -}; use tokio::sync::RwLock; pub mod static_updater; @@ -31,8 +25,6 @@ pub struct GasPriceService { next_block_algorithm: SharedGasPriceAlgo, /// The code that is run to update your specific algorithm update_algorithm: U, - // The da block cost sub service - da_block_costs_task_handle: ServiceRunner>, } impl GasPriceService @@ -44,15 +36,12 @@ where starting_block_height: BlockHeight, update_algorithm: U, mut shared_algo: SharedGasPriceAlgo, - da_block_costs_task_service: DaBlockCostsService, ) -> Self { let algorithm = update_algorithm.start(starting_block_height); shared_algo.update(algorithm).await; - let da_block_costs_task_handle = ServiceRunner::new(da_block_costs_task_service); Self { next_block_algorithm: shared_algo, update_algorithm, - da_block_costs_task_handle, } } @@ -145,7 +134,6 @@ where _state_watcher: &StateWatcher, _params: Self::TaskParams, ) -> anyhow::Result { - self.da_block_costs_task_handle.start_and_await().await?; Ok(self) } } @@ -180,7 +168,6 @@ where tracing::debug!("Updating gas price algorithm"); self.update(new_algo).await; } - self.da_block_costs_task_handle.stop_and_await().await?; Ok(()) } } @@ -190,10 +177,6 @@ where #[cfg(test)] mod tests { use crate::{ - fuel_gas_price_updater::da_source_adapter::{ - dummy_costs::DummyDaBlockCosts, - service::DaBlockCostsService, - }, GasPriceAlgorithm, GasPriceService, SharedGasPriceAlgo, @@ -252,12 +235,7 @@ mod tests { price_source: price_receiver, }; let shared_algo = SharedGasPriceAlgo::new_with_algorithm(start_algo); - let da_block_costs_source = DummyDaBlockCosts::new(Ok(Default::default())); - let da_block_costs_service = - DaBlockCostsService::new(da_block_costs_source, None); - let service = - GasPriceService::new(0.into(), updater, shared_algo, da_block_costs_service) - .await; + let service = GasPriceService::new(0.into(), updater, shared_algo).await; let read_algo = service.next_block_algorithm(); let service = ServiceRunner::new(service); service.start_and_await().await.unwrap();