diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs index 0c90b9a36d..7aea110ddd 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -23,11 +23,14 @@ mod tests { use super::*; use crate::v1::da_source_service::{ dummy_costs::DummyDaBlockCosts, - service::new_service, + service::new_da_service, }; use fuel_core_services::Service; use std::{ - sync::Arc, + sync::{ + Arc, + Mutex, + }, time::Duration, }; @@ -43,7 +46,12 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone()); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); + let latest_l2_height = Arc::new(Mutex::new(0u32)); + let service = new_da_service( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + ); let mut shared_state = &mut service.shared.subscribe(); // when @@ -62,7 +70,12 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone()); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); + let latest_l2_height = Arc::new(Mutex::new(0u32)); + let service = new_da_service( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + ); let mut shared_state = &mut service.shared.subscribe(); // when diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 675d82cda5..9c5c422724 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -5,7 +5,13 @@ use fuel_core_services::{ StateWatcher, TaskNextAction, }; -use std::time::Duration; +use std::{ + sync::{ + Arc, + Mutex, + }, + time::Duration, +}; use tokio::{ sync::broadcast::Sender, time::{ @@ -37,6 +43,7 @@ pub struct DaSourceService { poll_interval: Interval, source: Source, shared_state: SharedState, + latest_l2_height: Arc>, } const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024; @@ -46,7 +53,11 @@ impl DaSourceService where Source: DaBlockCostsSource, { - pub fn new(source: Source, poll_interval: Option) -> Self { + pub fn new( + source: Source, + poll_interval: Option, + latest_l2_height: Arc>, + ) -> Self { let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); #[allow(clippy::arithmetic_side_effects)] Self { @@ -55,6 +66,7 @@ where poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)), ), source, + latest_l2_height, } } @@ -133,9 +145,14 @@ where } } -pub fn new_service( +pub fn new_da_service( da_source: S, poll_interval: Option, + latest_l2_height: Arc>, ) -> ServiceRunner> { - ServiceRunner::new(DaSourceService::new(da_source, poll_interval)) + ServiceRunner::new(DaSourceService::new( + da_source, + poll_interval, + latest_l2_height, + )) } diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index 1c773c3a7e..a75d88e4a7 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU64; - use crate::{ common::{ gas_price_algorithm::SharedGasPriceAlgo, @@ -60,6 +58,13 @@ use fuel_gas_price_algorithm::{ }, }; use futures::FutureExt; +use std::{ + num::NonZeroU64, + sync::{ + Arc, + Mutex, + }, +}; use tokio::sync::broadcast::Receiver; /// The service that updates the gas price algorithm. @@ -82,6 +87,8 @@ where da_block_costs_buffer: Vec, /// Storage transaction provider for metadata and unrecorded blocks storage_tx_provider: AtomicStorage, + /// communicates to the Da source service what the latest L2 block was + latest_l2_block: Arc>, } impl GasPriceServiceV1 @@ -114,6 +121,7 @@ where algorithm_updater: AlgorithmUpdaterV1, da_source_adapter_handle: ServiceRunner>, storage_tx_provider: AtomicStorage, + latest_l2_block: Arc>, ) -> Self { let da_source_channel = da_source_adapter_handle.shared.clone().subscribe(); Self { @@ -124,6 +132,7 @@ where da_source_channel, da_block_costs_buffer: Vec::new(), storage_tx_provider, + latest_l2_block, } } @@ -341,10 +350,12 @@ where mod tests { use std::{ num::NonZeroU64, - sync::Arc, + sync::{ + Arc, + Mutex, + }, time::Duration, }; - use tokio::sync::mpsc; use fuel_core_services::{ @@ -487,12 +498,14 @@ mod tests { initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); let notifier = Arc::new(tokio::sync::Notify::new()); + let latest_l2_block = Arc::new(Mutex::new(0u32)); let dummy_da_source = DaSourceService::new( DummyDaBlockCosts::new( Err(anyhow::anyhow!("unused at the moment")), notifier.clone(), ), None, + latest_l2_block, ); let da_service_runner = ServiceRunner::new(dummy_da_source); da_service_runner.start_and_await().await.unwrap(); @@ -566,6 +579,7 @@ mod tests { algo_updater.last_profit = 10_000; algo_updater.new_scaled_da_gas_price = 10_000_000; + let latest_l2_block = Arc::new(Mutex::new(0u32)); let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( @@ -578,6 +592,7 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), + latest_l2_block, ); let mut watcher = StateWatcher::started(); let da_service_runner = ServiceRunner::new(da_source); @@ -660,6 +675,7 @@ mod tests { algo_updater.last_profit = 10_000; algo_updater.new_scaled_da_gas_price = 10_000_000; + let latest_l2_block = Arc::new(Mutex::new(0u32)); let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( @@ -672,6 +688,7 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), + latest_l2_block, ); let mut watcher = StateWatcher::started(); let da_service_runner = ServiceRunner::new(da_source); diff --git a/crates/services/gas_price_service/src/v1/tests.rs b/crates/services/gas_price_service/src/v1/tests.rs index 1a20b98fb5..15c95cfb21 100644 --- a/crates/services/gas_price_service/src/v1/tests.rs +++ b/crates/services/gas_price_service/src/v1/tests.rs @@ -28,8 +28,10 @@ use crate::{ }, v1::{ algorithm::SharedV1Algorithm, + da_source_service, da_source_service::{ service::{ + new_da_service, DaBlockCostsSource, DaSourceService, }, @@ -376,8 +378,8 @@ async fn next_gas_price__affected_by_new_l2_block() { let (algo_updater, shared_algo) = initialize_algorithm(&config, height, &metadata_storage).unwrap(); let da_source = FakeDABlockCost::never_returns(); - let da_source_service = DaSourceService::new(da_source, None); - let da_service_runner = ServiceRunner::new(da_source_service); + let latest_l2_height = Arc::new(Mutex::new(0u32)); + let da_service_runner = new_da_service(da_source, None, latest_l2_height); da_service_runner.start_and_await().await.unwrap(); let mut service = GasPriceServiceV1::new( @@ -423,8 +425,8 @@ async fn run__new_l2_block_saves_old_metadata() { let algo_updater = updater_from_config(&config); let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); let da_source = FakeDABlockCost::never_returns(); - let da_source_service = DaSourceService::new(da_source, None); - let da_service_runner = ServiceRunner::new(da_source_service); + let latest_l2_height = Arc::new(Mutex::new(0u32)); + let da_service_runner = new_da_service(da_source, None, latest_l2_height); da_service_runner.start_and_await().await.unwrap(); let mut service = GasPriceServiceV1::new( l2_block_source, @@ -451,6 +453,52 @@ async fn run__new_l2_block_saves_old_metadata() { service.shutdown().await.unwrap(); } +#[tokio::test] +async fn run__updates_da_service_latest_l2_height() { + // given + let da_service_latest_l2_block = Arc::new(Mutex::new(0u32)); + let l2_height = 10; + let l2_block = BlockInfo::Block { + height: l2_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let config = zero_threshold_arbitrary_config(); + let inner = database(); + let algo_updater = updater_from_config(&config); + let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); + let da_source = FakeDABlockCost::never_returns(); + let da_service_runner = + new_da_service(da_source, None, da_service_latest_l2_block.clone()); + da_service_runner.start_and_await().await.unwrap(); + let mut service = GasPriceServiceV1::new( + l2_block_source, + shared_algo, + algo_updater, + da_service_runner, + inner, + ); + let mut watcher = StateWatcher::started(); + + // when + l2_block_sender.send(l2_block).await.unwrap(); + service.run(&mut watcher).await; + + // then + let latest_value = da_service_latest_l2_block.lock().unwrap(); + assert_eq!(*latest_value, l2_height); + + // cleanup + service.shutdown().await.unwrap(); +} + #[derive(Clone)] struct FakeSettings { settings: GasPriceSettings, @@ -691,7 +739,7 @@ async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() { } #[tokio::test] -async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage() { +async fn uninitialized_task__init__starts_da_service_with_recorded_height_in_storage() { // given let block_height = 100; let recorded_height: u32 = 200; diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task.rs b/crates/services/gas_price_service/src/v1/uninitialized_task.rs index db77274ccc..16822aae5a 100644 --- a/crates/services/gas_price_service/src/v1/uninitialized_task.rs +++ b/crates/services/gas_price_service/src/v1/uninitialized_task.rs @@ -81,7 +81,10 @@ use fuel_gas_price_algorithm::v1::{ AlgorithmUpdaterV1, UnrecordedBlocks, }; -use std::time::Duration; +use std::{ + sync::Arc, + time::Duration, +}; pub mod fuel_storage_unrecorded_blocks; @@ -165,7 +168,9 @@ where .config .da_poll_interval .map(|x| Duration::from_millis(x.into())); - let da_service = DaSourceService::new(self.da_source, poll_duration); + let latest_l2_height = Arc::new(std::sync::Mutex::new(0)); + let da_service = + DaSourceService::new(self.da_source, poll_duration, latest_l2_height.clone()); let da_service_runner = ServiceRunner::new(da_service); da_service_runner.start_and_await().await?; @@ -176,6 +181,7 @@ where self.algo_updater, da_service_runner, self.gas_price_db, + latest_l2_height, ); Ok(service) } else { @@ -197,6 +203,7 @@ where self.algo_updater, da_service_runner, self.gas_price_db, + latest_l2_height, ); Ok(service) }