diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 0bb2bb38383..2c5ba1da41c 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -192,7 +192,7 @@ pub fn init_sub_services( tracing::debug!("da_committer_url: {:?}", config.da_committer_url); let committer_api = BlockCommitterHttpApi::new(config.da_committer_url.clone()); - let da_source = BlockCommitterDaBlockCosts::new(committer_api, None); + let da_source = BlockCommitterDaBlockCosts::new(committer_api); let v1_config = GasPriceServiceConfig::from(config.clone()) .v1() .ok_or(anyhow!( diff --git a/crates/services/gas_price_service/src/common/utils.rs b/crates/services/gas_price_service/src/common/utils.rs index b57fc2608a2..f14727bfad9 100644 --- a/crates/services/gas_price_service/src/common/utils.rs +++ b/crates/services/gas_price_service/src/common/utils.rs @@ -26,7 +26,7 @@ pub enum Error { pub type Result = core::result::Result; // Info required about the l2 block for the gas price algorithm -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum BlockInfo { // The genesis block of the L2 chain GenesisBlock, diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs index 0c90b9a36d3..ad4bd1debfe 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -23,14 +23,30 @@ mod tests { use super::*; use crate::v1::da_source_service::{ dummy_costs::DummyDaBlockCosts, - service::new_service, + service::{ + new_da_service, + DaSourceService, + DA_BLOCK_COSTS_CHANNEL_SIZE, + }, }; - use fuel_core_services::Service; + use fuel_core_services::{ + RunnableTask, + Service, + StateWatcher, + }; + use fuel_core_types::fuel_types::BlockHeight; use std::{ - sync::Arc, + sync::{ + Arc, + Mutex, + }, time::Duration, }; + fn latest_l2_height(height: u32) -> Arc> { + Arc::new(Mutex::new(BlockHeight::new(height))) + } + #[tokio::test] async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() { // given @@ -43,7 +59,12 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone()); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); + let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32))); + let service = new_da_service( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + ); let mut shared_state = &mut service.shared.subscribe(); // when @@ -62,7 +83,12 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone()); - let service = new_service(da_block_costs_source, Some(Duration::from_millis(1))); + let latest_l2_height = latest_l2_height(0); + let service = new_da_service( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + ); let mut shared_state = &mut service.shared.subscribe(); // when @@ -74,4 +100,112 @@ mod tests { assert!(da_block_costs_res.is_err()); service.stop_and_await().await.unwrap(); } + + #[tokio::test] + async fn run__will_not_return_cost_bundles_for_bundles_that_are_greater_than_l2_height( + ) { + // given + let l2_height = 4; + let unexpected_costs = DaBlockCosts { + bundle_id: 1, + l2_blocks: 0..=9, + bundle_size_bytes: 1024 * 128, + blob_cost_wei: 2, + }; + assert!(unexpected_costs.l2_blocks.end() > &l2_height); + let notifier = Arc::new(tokio::sync::Notify::new()); + let da_block_costs_source = + DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone()); + let latest_l2_height = latest_l2_height(l2_height); + let service = new_da_service( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + ); + let mut shared_state = &mut service.shared.subscribe(); + + // when + service.start_and_await().await.unwrap(); + notifier.notified().await; + + // then + let err = shared_state.try_recv(); + tracing::info!("err: {:?}", err); + assert!(err.is_err()); + } + + #[tokio::test] + async fn run__filtered_da_block_costs_do_not_update_latest_recorded_block() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .try_init(); + + // given + let l2_height = 4; + let unexpected_costs = DaBlockCosts { + bundle_id: 1, + l2_blocks: 2..=9, + bundle_size_bytes: 1024 * 128, + blob_cost_wei: 2, + }; + assert!(unexpected_costs.l2_blocks.end() > &l2_height); + let notifier = Arc::new(tokio::sync::Notify::new()); + let da_block_costs_source = + DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone()); + let latest_l2_height = latest_l2_height(l2_height); + let mut service = DaSourceService::new( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + None, + ); + let mut watcher = StateWatcher::started(); + + // when + let _ = service.run(&mut watcher).await; + + // then + let recorded_height = service.recorded_height(); + let expected = 1; + assert!(recorded_height.is_none()) + } + + #[tokio::test] + async fn run__recorded_height_updated_by_da_costs() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .try_init(); + + // given + let l2_height = 10; + let recorded_height = 9; + let unexpected_costs = DaBlockCosts { + bundle_id: 1, + l2_blocks: 2..=recorded_height, + bundle_size_bytes: 1024 * 128, + blob_cost_wei: 2, + }; + let notifier = Arc::new(tokio::sync::Notify::new()); + let da_block_costs_source = + DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone()); + let latest_l2_height = latest_l2_height(l2_height); + let (sender, mut receiver) = + tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); + let mut service = DaSourceService::new_with_sender( + da_block_costs_source, + Some(Duration::from_millis(1)), + latest_l2_height, + None, + sender, + ); + let mut watcher = StateWatcher::started(); + + // when + let next = service.run(&mut watcher).await; + + // then + let actual = service.recorded_height().unwrap(); + let expected = BlockHeight::from(recorded_height); + assert_eq!(expected, actual); + } } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs index fb6077472c4..9c6d71cdb0f 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs @@ -33,7 +33,6 @@ pub trait BlockCommitterApi: Send + Sync { /// which receives data from the block committer (only http api for now) pub struct BlockCommitterDaBlockCosts { client: BlockCommitter, - last_recorded_height: Option, } #[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)] @@ -79,14 +78,8 @@ impl From for DaBlockCosts { impl BlockCommitterDaBlockCosts { /// Create a new instance of the block committer da block costs source - pub fn new( - client: BlockCommitter, - last_recorded_height: Option, - ) -> Self { - Self { - client, - last_recorded_height, - } + pub fn new(client: BlockCommitter) -> Self { + Self { client } } } @@ -95,32 +88,27 @@ impl DaBlockCostsSource for BlockCommitterDaBlockCosts DaBlockCostsResult> { - let raw_da_block_costs: Vec<_> = - match self.last_recorded_height.and_then(|x| x.succ()) { - Some(ref next_height) => { - self.client - .get_costs_by_l2_block_number(*next_height.deref()) - .await? - } - None => self.client.get_latest_costs().await?.into_iter().collect(), - }; + async fn request_da_block_costs( + &mut self, + last_recorded_height: &Option, + ) -> DaBlockCostsResult> { + let raw_da_block_costs: Vec<_> = match last_recorded_height.and_then(|x| x.succ()) + { + Some(ref next_height) => { + self.client + .get_costs_by_l2_block_number(*next_height.deref()) + .await? + } + None => self.client.get_latest_costs().await?.into_iter().collect(), + }; tracing::info!("raw_da_block_costs: {:?}", raw_da_block_costs); let da_block_costs: Vec<_> = raw_da_block_costs.iter().map(DaBlockCosts::from).collect(); tracing::info!("da_block_costs: {:?}", da_block_costs); - if let Some(cost) = raw_da_block_costs.last() { - self.last_recorded_height = Some(BlockHeight::from(cost.end_height)); - } Ok(da_block_costs) } - - async fn set_last_value(&mut self, height: BlockHeight) -> DaBlockCostsResult<()> { - self.last_recorded_height = Some(height); - Ok(()) - } } pub struct BlockCommitterHttpApi { @@ -510,10 +498,10 @@ mod tests { let da_block_costs = test_da_block_costs(); let expected = vec![(&da_block_costs).into()]; let mock_api = MockBlockCommitterApi::new(Some(da_block_costs)); - let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None); + let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api); // when - let actual = block_committer.request_da_block_costs().await.unwrap(); + let actual = block_committer.request_da_block_costs(&None).await.unwrap(); // then assert_eq!(actual, expected); @@ -527,11 +515,13 @@ mod tests { let da_block_costs_len = da_block_costs.end_height - da_block_costs.start_height; let mock_api = MockBlockCommitterApi::new(Some(da_block_costs.clone())); let latest_height = BlockHeight::new(da_block_costs.end_height); - let mut block_committer = - BlockCommitterDaBlockCosts::new(mock_api, Some(latest_height)); + let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api); // when - let actual = block_committer.request_da_block_costs().await.unwrap(); + let actual = block_committer + .request_da_block_costs(&Some(latest_height)) + .await + .unwrap(); // then let l2_blocks = actual.first().unwrap().l2_blocks.clone(); diff --git a/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs index 8b5094d12e4..82c352d975e 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/dummy_costs.rs @@ -23,7 +23,10 @@ impl DummyDaBlockCosts { #[async_trait::async_trait] impl DaBlockCostsSource for DummyDaBlockCosts { - async fn request_da_block_costs(&mut self) -> DaBlockCostsResult> { + async fn request_da_block_costs( + &mut self, + _latest_recorded_height: &Option, + ) -> DaBlockCostsResult> { match &self.value { Ok(da_block_costs) => { self.notifier.notify_waiters(); @@ -35,8 +38,4 @@ impl DaBlockCostsSource for DummyDaBlockCosts { } } } - - async fn set_last_value(&mut self, _height: BlockHeight) -> DaBlockCostsResult<()> { - unimplemented!("This is a dummy implementation"); - } } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 675d82cda55..e60b22907cb 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -5,7 +5,13 @@ use fuel_core_services::{ StateWatcher, TaskNextAction, }; -use std::time::Duration; +use std::{ + sync::{ + Arc, + Mutex, + }, + time::Duration, +}; use tokio::{ sync::broadcast::Sender, time::{ @@ -37,16 +43,23 @@ pub struct DaSourceService { poll_interval: Interval, source: Source, shared_state: SharedState, + latest_l2_height: Arc>, + recorded_height: Option, } -const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024; +pub(crate) const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024; const POLLING_INTERVAL_MS: u64 = 10_000; impl DaSourceService where Source: DaBlockCostsSource, { - pub fn new(source: Source, poll_interval: Option) -> Self { + pub fn new( + source: Source, + poll_interval: Option, + latest_l2_height: Arc>, + recorded_height: Option, + ) -> Self { let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); #[allow(clippy::arithmetic_side_effects)] Self { @@ -55,26 +68,87 @@ where poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)), ), source, + latest_l2_height, + recorded_height, + } + } + + #[cfg(test)] + pub fn new_with_sender( + source: Source, + poll_interval: Option, + latest_l2_height: Arc>, + recorded_height: Option, + sender: Sender, + ) -> Self { + Self { + shared_state: SharedState::new(sender), + poll_interval: interval( + poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)), + ), + source, + latest_l2_height, + recorded_height, } } async fn process_block_costs(&mut self) -> Result<()> { - let da_block_costs_res = self.source.request_da_block_costs().await; + let da_block_costs_res = self + .source + .request_da_block_costs(&self.recorded_height) + .await; tracing::debug!("Received block costs: {:?}", da_block_costs_res); let da_block_costs = da_block_costs_res?; - for da_block_costs in da_block_costs { + let filtered_block_costs = self + .filter_costs_that_have_values_greater_than_l2_block_height(da_block_costs)?; + tracing::debug!( + "the latest l2 height is: {:?}", + *self.latest_l2_height.lock().unwrap() + ); + for da_block_costs in filtered_block_costs { + tracing::debug!("Sending block costs: {:?}", da_block_costs); + let end = BlockHeight::from(*da_block_costs.l2_blocks.end()); self.shared_state.0.send(da_block_costs)?; + if let Some(recorded_height) = self.recorded_height { + if end > recorded_height { + self.recorded_height = Some(end) + } + } else { + self.recorded_height = Some(end) + } } Ok(()) } + + fn filter_costs_that_have_values_greater_than_l2_block_height( + &self, + da_block_costs: Vec, + ) -> Result> { + let latest_l2_height = *self + .latest_l2_height + .lock() + .map_err(|err| anyhow::anyhow!("lock error: {:?}", err))?; + let iter = da_block_costs.into_iter().filter(move |da_block_costs| { + let end = BlockHeight::from(*da_block_costs.l2_blocks.end()); + end < latest_l2_height + }); + Ok(iter) + } + + #[cfg(test)] + pub fn recorded_height(&self) -> Option { + self.recorded_height + } } /// This trait is implemented by the sources to obtain the /// da block costs in a way they see fit #[async_trait::async_trait] pub trait DaBlockCostsSource: Send + Sync { - async fn request_da_block_costs(&mut self) -> Result>; - async fn set_last_value(&mut self, block_height: BlockHeight) -> Result<()>; + async fn request_da_block_costs( + &mut self, + recorded_height: &Option, + ) -> Result>; } #[async_trait::async_trait] @@ -112,7 +186,6 @@ where /// This function polls the source according to a polling interval /// described by the DaBlockCostsService async fn run(&mut self, state_watcher: &mut StateWatcher) -> TaskNextAction { - tracing::debug!("111111111111111111111111111111111"); tokio::select! { biased; _ = state_watcher.while_started() => { @@ -133,9 +206,15 @@ where } } -pub fn new_service( +pub fn new_da_service( da_source: S, poll_interval: Option, + latest_l2_height: Arc>, ) -> ServiceRunner> { - ServiceRunner::new(DaSourceService::new(da_source, poll_interval)) + ServiceRunner::new(DaSourceService::new( + da_source, + poll_interval, + latest_l2_height, + None, + )) } diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index 602b086539b..3069226f836 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU64; - use crate::{ common::{ gas_price_algorithm::SharedGasPriceAlgo, @@ -60,6 +58,13 @@ use fuel_gas_price_algorithm::{ }, }; use futures::FutureExt; +use std::{ + num::NonZeroU64, + sync::{ + Arc, + Mutex, + }, +}; use tokio::sync::broadcast::Receiver; /// The service that updates the gas price algorithm. @@ -82,6 +87,8 @@ where da_block_costs_buffer: Vec, /// Storage transaction provider for metadata and unrecorded blocks storage_tx_provider: AtomicStorage, + /// communicates to the Da source service what the latest L2 block was + latest_l2_block: Arc>, } impl GasPriceServiceV1 @@ -99,6 +106,23 @@ where tracing::debug!("Updating gas price algorithm"); self.apply_block_info_to_gas_algorithm(block).await?; + + self.notify_da_source_service_l2_block(block)?; + Ok(()) + } + + fn notify_da_source_service_l2_block(&self, block: BlockInfo) -> anyhow::Result<()> { + tracing::debug!("Notifying the Da source service of the latest L2 block"); + match block { + BlockInfo::GenesisBlock => {} + BlockInfo::Block { height, .. } => { + let mut latest_l2_block = self + .latest_l2_block + .lock() + .map_err(|err| anyhow!("Error locking latest L2 block: {:?}", err))?; + *latest_l2_block = BlockHeight::from(height); + } + } Ok(()) } } @@ -114,6 +138,7 @@ where algorithm_updater: AlgorithmUpdaterV1, da_source_adapter_handle: ServiceRunner>, storage_tx_provider: AtomicStorage, + latest_l2_block: Arc>, ) -> Self { let da_source_channel = da_source_adapter_handle.shared.clone().subscribe(); Self { @@ -124,6 +149,7 @@ where da_source_channel, da_block_costs_buffer: Vec::new(), storage_tx_provider, + latest_l2_block, } } @@ -341,10 +367,12 @@ where mod tests { use std::{ num::NonZeroU64, - sync::Arc, + sync::{ + Arc, + Mutex, + }, time::Duration, }; - use tokio::sync::mpsc; use fuel_core_services::{ @@ -448,6 +476,9 @@ mod tests { fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() } + fn latest_l2_height(height: u32) -> Arc> { + Arc::new(Mutex::new(BlockHeight::new(height))) + } #[tokio::test] async fn run__updates_gas_price_with_l2_block_source() { @@ -489,15 +520,19 @@ mod tests { initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); let notifier = Arc::new(tokio::sync::Notify::new()); + let latest_l2_block = Arc::new(Mutex::new(BlockHeight::new(0))); let dummy_da_source = DaSourceService::new( DummyDaBlockCosts::new( Err(anyhow::anyhow!("unused at the moment")), notifier.clone(), ), None, + latest_l2_block, + None, ); let da_service_runner = ServiceRunner::new(dummy_da_source); da_service_runner.start_and_await().await.unwrap(); + let latest_l2_height = latest_l2_height(0); let mut service = GasPriceServiceV1::new( l2_block_source, @@ -505,6 +540,7 @@ mod tests { algo_updater, da_service_runner, inner, + latest_l2_height, ); let read_algo = service.next_block_algorithm(); let mut watcher = StateWatcher::default(); @@ -523,7 +559,7 @@ mod tests { #[tokio::test] async fn run__updates_gas_price_with_da_block_cost_source() { // given - let block_height = 2; + let block_height = 3; let l2_block_2 = BlockInfo::Block { height: block_height, gas_used: 60, @@ -568,6 +604,7 @@ mod tests { algo_updater.last_profit = 10_000; algo_updater.new_scaled_da_gas_price = 10_000_000; + let latest_l2_block = latest_l2_height(block_height - 1); let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( @@ -580,6 +617,8 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), + latest_l2_block.clone(), + None, ); let mut watcher = StateWatcher::started(); let da_service_runner = ServiceRunner::new(da_source); @@ -591,16 +630,19 @@ mod tests { algo_updater, da_service_runner, inner, + latest_l2_block, ); let read_algo = service.next_block_algorithm(); let initial_price = read_algo.next_gas_price(); - service.run(&mut watcher).await; + let next = service.run(&mut watcher).await; + tracing::info!("Next action: {:?}", next); tokio::time::sleep(Duration::from_millis(3)).await; l2_block_sender.send(l2_block_2).await.unwrap(); // when - service.run(&mut watcher).await; + let next = service.run(&mut watcher).await; + tracing::info!("Next action 2: {:?}", next); tokio::time::sleep(Duration::from_millis(3)).await; service.shutdown().await.unwrap(); @@ -662,6 +704,7 @@ mod tests { algo_updater.last_profit = 10_000; algo_updater.new_scaled_da_gas_price = 10_000_000; + let latest_l2_height = latest_l2_height(block_height - 1); let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( @@ -674,6 +717,8 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), + latest_l2_height.clone(), + None, ); let mut watcher = StateWatcher::started(); let da_service_runner = ServiceRunner::new(da_source); @@ -685,6 +730,7 @@ mod tests { algo_updater, da_service_runner, inner, + latest_l2_height, ); let read_algo = service.next_block_algorithm(); let initial_price = read_algo.next_gas_price(); diff --git a/crates/services/gas_price_service/src/v1/tests.rs b/crates/services/gas_price_service/src/v1/tests.rs index f43671b3420..04297a18fa8 100644 --- a/crates/services/gas_price_service/src/v1/tests.rs +++ b/crates/services/gas_price_service/src/v1/tests.rs @@ -28,8 +28,10 @@ use crate::{ }, v1::{ algorithm::SharedV1Algorithm, + da_source_service, da_source_service::{ service::{ + new_da_service, DaBlockCostsSource, DaSourceService, }, @@ -126,13 +128,13 @@ impl L2BlockSource for FakeL2BlockSource { } struct FakeMetadata { - inner: Arc>>, + inner: Arc>>, } impl FakeMetadata { fn empty() -> Self { Self { - inner: Arc::new(std::sync::Mutex::new(None)), + inner: Arc::new(Mutex::new(None)), } } } @@ -239,7 +241,7 @@ impl AsUnrecordedBlocks for UnimplementedStorageTx { struct FakeDABlockCost { da_block_costs: Receiver, - latest_received_height: Arc>>, + latest_requested_height: Arc>>, } impl FakeDABlockCost { @@ -247,14 +249,14 @@ impl FakeDABlockCost { let (_sender, receiver) = tokio::sync::mpsc::channel(1); Self { da_block_costs: receiver, - latest_received_height: Arc::new(Mutex::new(None)), + latest_requested_height: Arc::new(Mutex::new(None)), } } fn new(da_block_costs: Receiver) -> Self { Self { da_block_costs, - latest_received_height: Arc::new(Mutex::new(None)), + latest_requested_height: Arc::new(Mutex::new(None)), } } @@ -264,7 +266,7 @@ impl FakeDABlockCost { let height = Arc::new(Mutex::new(None)); let service = Self { da_block_costs: receiver, - latest_received_height: height.clone(), + latest_requested_height: height.clone(), }; (service, height) } @@ -272,15 +274,14 @@ impl FakeDABlockCost { #[async_trait::async_trait] impl DaBlockCostsSource for FakeDABlockCost { - async fn request_da_block_costs(&mut self) -> Result> { + async fn request_da_block_costs( + &mut self, + latest_recorded_height: &Option, + ) -> Result> { + *self.latest_requested_height.lock().unwrap() = *latest_recorded_height; let costs = self.da_block_costs.recv().await.unwrap(); Ok(vec![costs]) } - - async fn set_last_value(&mut self, height: BlockHeight) -> Result<()> { - self.latest_received_height.lock().unwrap().replace(height); - Ok(()) - } } fn zero_threshold_arbitrary_config() -> V1AlgorithmConfig { @@ -353,6 +354,9 @@ fn gas_price_database_with_metadata( tx.commit().unwrap(); db } +fn latest_l2_height(height: u32) -> Arc> { + Arc::new(Mutex::new(BlockHeight::new(height))) +} #[tokio::test] async fn next_gas_price__affected_by_new_l2_block() { @@ -376,8 +380,8 @@ async fn next_gas_price__affected_by_new_l2_block() { let (algo_updater, shared_algo) = initialize_algorithm(&config, height, &metadata_storage).unwrap(); let da_source = FakeDABlockCost::never_returns(); - let da_source_service = DaSourceService::new(da_source, None); - let da_service_runner = ServiceRunner::new(da_source_service); + let latest_l2_height = latest_l2_height(0); + let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); da_service_runner.start_and_await().await.unwrap(); let mut service = GasPriceServiceV1::new( @@ -386,6 +390,7 @@ async fn next_gas_price__affected_by_new_l2_block() { algo_updater, da_service_runner, inner, + latest_l2_height, ); let read_algo = service.next_block_algorithm(); @@ -423,8 +428,8 @@ async fn run__new_l2_block_saves_old_metadata() { let algo_updater = updater_from_config(&config); let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); let da_source = FakeDABlockCost::never_returns(); - let da_source_service = DaSourceService::new(da_source, None); - let da_service_runner = ServiceRunner::new(da_source_service); + let latest_l2_height = latest_l2_height(0); + let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); da_service_runner.start_and_await().await.unwrap(); let mut service = GasPriceServiceV1::new( l2_block_source, @@ -432,6 +437,7 @@ async fn run__new_l2_block_saves_old_metadata() { algo_updater, da_service_runner, inner, + latest_l2_height, ); let mut watcher = StateWatcher::started(); @@ -451,6 +457,51 @@ async fn run__new_l2_block_saves_old_metadata() { service.shutdown().await.unwrap(); } +#[tokio::test] +async fn run__updates_da_service_latest_l2_height() { + // given + let da_service_latest_l2_block = Arc::new(Mutex::new(0u32)); + let l2_height = 10; + let l2_block = BlockInfo::Block { + height: l2_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let config = zero_threshold_arbitrary_config(); + let inner = database(); + let mut algo_updater = updater_from_config(&config); + algo_updater.l2_block_height = l2_height - 1; + let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); + let da_source = FakeDABlockCost::never_returns(); + let latest_l2_height = latest_l2_height(0); + let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); + da_service_runner.start_and_await().await.unwrap(); + let mut service = GasPriceServiceV1::new( + l2_block_source, + shared_algo, + algo_updater, + da_service_runner, + inner, + latest_l2_height.clone(), + ); + let mut watcher = StateWatcher::started(); + + // when + l2_block_sender.send(l2_block).await.unwrap(); + let _ = service.run(&mut watcher).await; + + // then + let latest_value = *latest_l2_height.lock().unwrap(); + assert_eq!(*latest_value, l2_height); +} + #[derive(Clone)] struct FakeSettings { settings: GasPriceSettings, @@ -691,13 +742,13 @@ async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() { } #[tokio::test] -async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage() { +async fn uninitialized_task__init__starts_da_service_with_recorded_height_in_storage() { // given let block_height = 100; let recorded_height: u32 = 200; let original_metadata = arbitrary_metadata(); - let different_config = different_arb_config(); + let mut different_config = different_arb_config(); let descaleed_exec_price = original_metadata.new_scaled_exec_price / original_metadata.gas_price_factor; assert_ne!(different_config.new_exec_gas_price, descaleed_exec_price); @@ -705,13 +756,14 @@ async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage() let settings = FakeSettings::default(); let block_stream = empty_block_stream(); let on_chain_db = FakeOnChainDb::new(different_l2_block); - let (da_cost_source, recorded_block_height_handle) = + let (da_cost_source, latest_requested_recorded_height) = FakeDABlockCost::never_returns_with_handle_to_last_height(); let mut inner = gas_price_database_with_metadata(&original_metadata); let mut tx = inner.begin_transaction().unwrap(); tx.set_recorded_height(BlockHeight::from(recorded_height)) .unwrap(); StorageTransaction::commit_transaction(tx).unwrap(); + different_config.da_poll_interval = Some(1); let service = UninitializedTask::new( different_config.clone(), Some(block_height.into()), @@ -725,10 +777,11 @@ async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage() .unwrap(); // when - service.init(&StateWatcher::started()).await.unwrap(); + let _gas_price_service = service.init(&StateWatcher::started()).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(3)).await; // then - let actual = recorded_block_height_handle.lock().unwrap(); + let actual = latest_requested_recorded_height.lock().unwrap(); let expected = Some(BlockHeight::new(recorded_height)); assert_eq!(*actual, expected); } diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task.rs b/crates/services/gas_price_service/src/v1/uninitialized_task.rs index db77274ccca..a5330b2e99b 100644 --- a/crates/services/gas_price_service/src/v1/uninitialized_task.rs +++ b/crates/services/gas_price_service/src/v1/uninitialized_task.rs @@ -81,7 +81,10 @@ use fuel_gas_price_algorithm::v1::{ AlgorithmUpdaterV1, UnrecordedBlocks, }; -use std::time::Duration; +use std::{ + sync::Arc, + time::Duration, +}; pub mod fuel_storage_unrecorded_blocks; @@ -157,15 +160,18 @@ where self.block_stream, ); - if let Some(last_recorded_height) = self.gas_price_db.get_recorded_height()? { - self.da_source.set_last_value(last_recorded_height).await?; - tracing::info!("Set last recorded height to {}", last_recorded_height); - } + let recorded_height = self.gas_price_db.get_recorded_height()?; let poll_duration = self .config .da_poll_interval .map(|x| Duration::from_millis(x.into())); - let da_service = DaSourceService::new(self.da_source, poll_duration); + let latest_l2_height = Arc::new(std::sync::Mutex::new(BlockHeight::new(0))); + let da_service = DaSourceService::new( + self.da_source, + poll_duration, + latest_l2_height.clone(), + recorded_height, + ); let da_service_runner = ServiceRunner::new(da_service); da_service_runner.start_and_await().await?; @@ -176,6 +182,7 @@ where self.algo_updater, da_service_runner, self.gas_price_db, + latest_l2_height, ); Ok(service) } else { @@ -197,6 +204,7 @@ where self.algo_updater, da_service_runner, self.gas_price_db, + latest_l2_height, ); Ok(service) } diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index 93b1b179ff6..2964841d6f6 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -84,6 +84,7 @@ pub trait RunnableService: Send { } /// The result of a single iteration of the service task +#[derive(Debug)] pub enum TaskNextAction { /// Request the task to be run again Continue, diff --git a/tests/tests/gas_price.rs b/tests/tests/gas_price.rs index a86c5d5a67d..cdc3809990e 100644 --- a/tests/tests/gas_price.rs +++ b/tests/tests/gas_price.rs @@ -640,6 +640,11 @@ fn produce_block__l1_committed_block_affects_gas_price() { .await .unwrap(); tokio::time::sleep(Duration::from_millis(20)).await; + // Won't accept DA costs until l2_height is > 1 + driver.client.produce_blocks(1, None).await.unwrap(); + // Wait for DaBlockCosts to be accepted + tokio::time::sleep(Duration::from_millis(2)).await; + // Produce new block to update gas price driver.client.produce_blocks(1, None).await.unwrap(); tokio::time::sleep(Duration::from_millis(20)).await; driver.client.estimate_gas_price(0).await.unwrap().gas_price