Skip to content

Commit

Permalink
WIP create latest_l2_block concept in gas price and da services
Browse files Browse the repository at this point in the history
  • Loading branch information
MitchTurner committed Dec 20, 2024
1 parent 0dc2dc7 commit 663b825
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 19 deletions.
21 changes: 17 additions & 4 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ mod tests {
use super::*;
use crate::v1::da_source_service::{
dummy_costs::DummyDaBlockCosts,
service::new_service,
service::new_da_service,
};
use fuel_core_services::Service;
use std::{
sync::Arc,
sync::{
Arc,
Mutex,
},
time::Duration,
};

Expand All @@ -43,7 +46,12 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let latest_l2_height = Arc::new(Mutex::new(0u32));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
Expand All @@ -62,7 +70,12 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let latest_l2_height = Arc::new(Mutex::new(0u32));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ use fuel_core_services::{
StateWatcher,
TaskNextAction,
};
use std::time::Duration;
use std::{
sync::{
Arc,
Mutex,
},
time::Duration,
};
use tokio::{
sync::broadcast::Sender,
time::{
Expand Down Expand Up @@ -37,6 +43,7 @@ pub struct DaSourceService<Source> {
poll_interval: Interval,
source: Source,
shared_state: SharedState,
latest_l2_height: Arc<Mutex<u32>>,
}

const DA_BLOCK_COSTS_CHANNEL_SIZE: usize = 16 * 1024;
Expand All @@ -46,7 +53,11 @@ impl<Source> DaSourceService<Source>
where
Source: DaBlockCostsSource,
{
pub fn new(source: Source, poll_interval: Option<Duration>) -> Self {
pub fn new(
source: Source,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<u32>>,
) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
#[allow(clippy::arithmetic_side_effects)]
Self {
Expand All @@ -55,6 +66,7 @@ where
poll_interval.unwrap_or(Duration::from_millis(POLLING_INTERVAL_MS)),
),
source,
latest_l2_height,
}
}

Expand Down Expand Up @@ -133,9 +145,14 @@ where
}
}

pub fn new_service<S: DaBlockCostsSource>(
pub fn new_da_service<S: DaBlockCostsSource>(
da_source: S,
poll_interval: Option<Duration>,
latest_l2_height: Arc<Mutex<u32>>,
) -> ServiceRunner<DaSourceService<S>> {
ServiceRunner::new(DaSourceService::new(da_source, poll_interval))
ServiceRunner::new(DaSourceService::new(
da_source,
poll_interval,
latest_l2_height,
))
}
25 changes: 21 additions & 4 deletions crates/services/gas_price_service/src/v1/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::num::NonZeroU64;

use crate::{
common::{
gas_price_algorithm::SharedGasPriceAlgo,
Expand Down Expand Up @@ -60,6 +58,13 @@ use fuel_gas_price_algorithm::{
},
};
use futures::FutureExt;
use std::{
num::NonZeroU64,
sync::{
Arc,
Mutex,
},
};
use tokio::sync::broadcast::Receiver;

/// The service that updates the gas price algorithm.
Expand All @@ -82,6 +87,8 @@ where
da_block_costs_buffer: Vec<DaBlockCosts>,
/// Storage transaction provider for metadata and unrecorded blocks
storage_tx_provider: AtomicStorage,
/// communicates to the Da source service what the latest L2 block was
latest_l2_block: Arc<Mutex<u32>>,
}

impl<L2, DA, AtomicStorage> GasPriceServiceV1<L2, DA, AtomicStorage>
Expand Down Expand Up @@ -114,6 +121,7 @@ where
algorithm_updater: AlgorithmUpdaterV1,
da_source_adapter_handle: ServiceRunner<DaSourceService<DA>>,
storage_tx_provider: AtomicStorage,
latest_l2_block: Arc<Mutex<u32>>,
) -> Self {
let da_source_channel = da_source_adapter_handle.shared.clone().subscribe();
Self {
Expand All @@ -124,6 +132,7 @@ where
da_source_channel,
da_block_costs_buffer: Vec::new(),
storage_tx_provider,
latest_l2_block,
}
}

Expand Down Expand Up @@ -341,10 +350,12 @@ where
mod tests {
use std::{
num::NonZeroU64,
sync::Arc,
sync::{
Arc,
Mutex,
},
time::Duration,
};

use tokio::sync::mpsc;

use fuel_core_services::{
Expand Down Expand Up @@ -487,12 +498,14 @@ mod tests {
initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap();

let notifier = Arc::new(tokio::sync::Notify::new());
let latest_l2_block = Arc::new(Mutex::new(0u32));
let dummy_da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Err(anyhow::anyhow!("unused at the moment")),
notifier.clone(),
),
None,
latest_l2_block,
);
let da_service_runner = ServiceRunner::new(dummy_da_source);
da_service_runner.start_and_await().await.unwrap();
Expand Down Expand Up @@ -566,6 +579,7 @@ mod tests {
algo_updater.last_profit = 10_000;
algo_updater.new_scaled_da_gas_price = 10_000_000;

let latest_l2_block = Arc::new(Mutex::new(0u32));
let notifier = Arc::new(tokio::sync::Notify::new());
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Expand All @@ -578,6 +592,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_block,
);
let mut watcher = StateWatcher::started();
let da_service_runner = ServiceRunner::new(da_source);
Expand Down Expand Up @@ -660,6 +675,7 @@ mod tests {
algo_updater.last_profit = 10_000;
algo_updater.new_scaled_da_gas_price = 10_000_000;

let latest_l2_block = Arc::new(Mutex::new(0u32));
let notifier = Arc::new(tokio::sync::Notify::new());
let da_source = DaSourceService::new(
DummyDaBlockCosts::new(
Expand All @@ -672,6 +688,7 @@ mod tests {
notifier.clone(),
),
Some(Duration::from_millis(1)),
latest_l2_block,
);
let mut watcher = StateWatcher::started();
let da_service_runner = ServiceRunner::new(da_source);
Expand Down
58 changes: 53 additions & 5 deletions crates/services/gas_price_service/src/v1/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ use crate::{
},
v1::{
algorithm::SharedV1Algorithm,
da_source_service,
da_source_service::{
service::{
new_da_service,
DaBlockCostsSource,
DaSourceService,
},
Expand Down Expand Up @@ -376,8 +378,8 @@ async fn next_gas_price__affected_by_new_l2_block() {
let (algo_updater, shared_algo) =
initialize_algorithm(&config, height, &metadata_storage).unwrap();
let da_source = FakeDABlockCost::never_returns();
let da_source_service = DaSourceService::new(da_source, None);
let da_service_runner = ServiceRunner::new(da_source_service);
let latest_l2_height = Arc::new(Mutex::new(0u32));
let da_service_runner = new_da_service(da_source, None, latest_l2_height);
da_service_runner.start_and_await().await.unwrap();

let mut service = GasPriceServiceV1::new(
Expand Down Expand Up @@ -423,8 +425,8 @@ async fn run__new_l2_block_saves_old_metadata() {
let algo_updater = updater_from_config(&config);
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let da_source_service = DaSourceService::new(da_source, None);
let da_service_runner = ServiceRunner::new(da_source_service);
let latest_l2_height = Arc::new(Mutex::new(0u32));
let da_service_runner = new_da_service(da_source, None, latest_l2_height);
da_service_runner.start_and_await().await.unwrap();
let mut service = GasPriceServiceV1::new(
l2_block_source,
Expand All @@ -451,6 +453,52 @@ async fn run__new_l2_block_saves_old_metadata() {
service.shutdown().await.unwrap();
}

#[tokio::test]
async fn run__updates_da_service_latest_l2_height() {
// given
let da_service_latest_l2_block = Arc::new(Mutex::new(0u32));
let l2_height = 10;
let l2_block = BlockInfo::Block {
height: l2_height,
gas_used: 60,
block_gas_capacity: 100,
block_bytes: 100,
block_fees: 100,
};
let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1);
let l2_block_source = FakeL2BlockSource {
l2_block: l2_block_receiver,
};

let config = zero_threshold_arbitrary_config();
let inner = database();
let algo_updater = updater_from_config(&config);
let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm());
let da_source = FakeDABlockCost::never_returns();
let da_service_runner =
new_da_service(da_source, None, da_service_latest_l2_block.clone());
da_service_runner.start_and_await().await.unwrap();
let mut service = GasPriceServiceV1::new(
l2_block_source,
shared_algo,
algo_updater,
da_service_runner,
inner,
);
let mut watcher = StateWatcher::started();

// when
l2_block_sender.send(l2_block).await.unwrap();
service.run(&mut watcher).await;

// then
let latest_value = da_service_latest_l2_block.lock().unwrap();
assert_eq!(*latest_value, l2_height);

// cleanup
service.shutdown().await.unwrap();
}

#[derive(Clone)]
struct FakeSettings {
settings: GasPriceSettings,
Expand Down Expand Up @@ -691,7 +739,7 @@ async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() {
}

#[tokio::test]
async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage() {
async fn uninitialized_task__init__starts_da_service_with_recorded_height_in_storage() {
// given
let block_height = 100;
let recorded_height: u32 = 200;
Expand Down
11 changes: 9 additions & 2 deletions crates/services/gas_price_service/src/v1/uninitialized_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ use fuel_gas_price_algorithm::v1::{
AlgorithmUpdaterV1,
UnrecordedBlocks,
};
use std::time::Duration;
use std::{
sync::Arc,
time::Duration,
};

pub mod fuel_storage_unrecorded_blocks;

Expand Down Expand Up @@ -165,7 +168,9 @@ where
.config
.da_poll_interval
.map(|x| Duration::from_millis(x.into()));
let da_service = DaSourceService::new(self.da_source, poll_duration);
let latest_l2_height = Arc::new(std::sync::Mutex::new(0));
let da_service =
DaSourceService::new(self.da_source, poll_duration, latest_l2_height.clone());
let da_service_runner = ServiceRunner::new(da_service);
da_service_runner.start_and_await().await?;

Expand All @@ -176,6 +181,7 @@ where
self.algo_updater,
da_service_runner,
self.gas_price_db,
latest_l2_height,
);
Ok(service)
} else {
Expand All @@ -197,6 +203,7 @@ where
self.algo_updater,
da_service_runner,
self.gas_price_db,
latest_l2_height,
);
Ok(service)
}
Expand Down

0 comments on commit 663b825

Please sign in to comment.