diff --git a/fendermint/vm/topdown/src/launch.rs b/fendermint/vm/topdown/src/launch.rs index a47147cc8..8872bd361 100644 --- a/fendermint/vm/topdown/src/launch.rs +++ b/fendermint/vm/topdown/src/launch.rs @@ -2,11 +2,11 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::proxy::ParentQueryProxy; -use crate::syncer::{start_parent_syncer, ParentPoller, ParentSyncerConfig}; +use crate::syncer::{ParentPoller, ParentSyncerConfig, ParentSyncerReactorClient}; use crate::vote::gossip::GossipClient; use crate::vote::payload::PowerUpdates; use crate::vote::store::InMemoryVoteStore; -use crate::vote::{start_vote_reactor, StartVoteReactorParams}; +use crate::vote::{StartVoteReactorParams, VoteReactorClient}; use crate::{BlockHeight, Checkpoint, Config, TopdownClient, TopdownProposal}; use anyhow::anyhow; use cid::Cid; @@ -33,7 +33,7 @@ pub async fn run_topdown( validator_key: SecretKey, gossip_client: Gossip, parent_client: ParentClient, - poller_fn: impl FnOnce(&Checkpoint, ParentClient, ParentSyncerConfig) -> Poller, + poller_fn: impl FnOnce(&Checkpoint, ParentClient, ParentSyncerConfig) -> Poller + Send + 'static, ) -> anyhow::Result where CheckpointQuery: LaunchQuery + Send + Sync + 'static, @@ -41,39 +41,54 @@ where Poller: ParentPoller + Send + Sync + 'static, ParentClient: ParentQueryProxy + Send + Sync + 'static, { - let query = Arc::new(query); - let checkpoint = query_starting_checkpoint(&query, &parent_client).await?; - - let power_table = query_starting_committee(&query).await?; - let power_table = power_table - .into_iter() - .map(|v| { - let vk = ValidatorKey::new(v.public_key.0); - let w = v.power.0; - (vk, w) - }) - .collect::>(); - - let poller = poller_fn(&checkpoint, parent_client, config.syncer.clone()); - let internal_event_rx = poller.subscribe(); - - let syncer_client = start_parent_syncer(config.syncer, poller)?; - - let voting_client = start_vote_reactor(StartVoteReactorParams { - config: config.voting, - validator_key, - power_table, - last_finalized_height: checkpoint.target_height(), - latest_child_block: query.latest_chain_block()?, - gossip: gossip_client, - vote_store: InMemoryVoteStore::default(), - internal_event_listener: internal_event_rx, - })?; - - tracing::info!( - finality = checkpoint.to_string(), - "launching parent syncer with last committed checkpoint" - ); + let (syncer_client, syncer_rx) = + ParentSyncerReactorClient::new(config.syncer.request_channel_size); + let (voting_client, voting_rx) = VoteReactorClient::new(config.voting.req_channel_buffer_size); + + tokio::spawn(async move { + let query = Arc::new(query); + let checkpoint = query_starting_checkpoint(&query, &parent_client) + .await + .expect("should be able to query starting checkpoint"); + + let power_table = query_starting_committee(&query) + .await + .expect("should be able to query starting committee"); + let power_table = power_table + .into_iter() + .map(|v| { + let vk = ValidatorKey::new(v.public_key.0); + let w = v.power.0; + (vk, w) + }) + .collect::>(); + + let poller = poller_fn(&checkpoint, parent_client, config.syncer.clone()); + let internal_event_rx = poller.subscribe(); + + ParentSyncerReactorClient::start_reactor(syncer_rx, poller, config.syncer); + VoteReactorClient::start_reactor( + voting_rx, + StartVoteReactorParams { + config: config.voting, + validator_key, + power_table, + last_finalized_height: checkpoint.target_height(), + latest_child_block: query + .latest_chain_block() + .expect("should query latest chain block"), + gossip: gossip_client, + vote_store: InMemoryVoteStore::default(), + internal_event_listener: internal_event_rx, + }, + ) + .expect("cannot start vote reactor"); + + tracing::info!( + finality = checkpoint.to_string(), + "launching parent syncer with last committed checkpoint" + ); + }); Ok(TopdownClient { syncer: syncer_client, diff --git a/fendermint/vm/topdown/src/syncer/mod.rs b/fendermint/vm/topdown/src/syncer/mod.rs index 4683c9e14..11b4eb814 100644 --- a/fendermint/vm/topdown/src/syncer/mod.rs +++ b/fendermint/vm/topdown/src/syncer/mod.rs @@ -48,6 +48,39 @@ pub struct ParentSyncerReactorClient { tx: mpsc::Sender, } +impl ParentSyncerReactorClient { + pub fn new(request_channel_size: usize) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(request_channel_size); + (Self { tx }, rx) + } + + pub fn start_reactor( + mut rx: mpsc::Receiver, + mut poller: P, + config: ParentSyncerConfig, + ) { + tokio::spawn(async move { + let polling_interval = config.polling_interval; + + loop { + select! { + _ = tokio::time::sleep(polling_interval) => { + if let Err(e) = poller.try_poll().await { + tracing::error!(err = e.to_string(), "cannot sync with parent"); + } + } + req = rx.recv() => { + let Some(req) = req else { break }; + handle_request(req, &mut poller); + } + } + } + + tracing::warn!("parent syncer stopped") + }); + } +} + /// Polls the parent block view #[async_trait] pub trait ParentPoller { @@ -69,34 +102,6 @@ pub trait ParentPoller { ) -> anyhow::Result>>; } -pub fn start_parent_syncer( - config: ParentSyncerConfig, - mut poller: P, -) -> anyhow::Result { - let (tx, mut rx) = mpsc::channel(config.request_channel_size); - - tokio::spawn(async move { - let polling_interval = config.polling_interval; - - loop { - select! { - _ = tokio::time::sleep(polling_interval) => { - if let Err(e) = poller.try_poll().await { - tracing::error!(err = e.to_string(), "cannot sync with parent"); - } - } - req = rx.recv() => { - let Some(req) = req else { break }; - handle_request(req, &mut poller); - } - } - } - - tracing::warn!("parent syncer stopped") - }); - Ok(ParentSyncerReactorClient { tx }) -} - impl ParentSyncerReactorClient { /// Marks the height as finalized. /// There is no need to wait for ack from the reactor @@ -117,7 +122,7 @@ impl ParentSyncerReactorClient { } } -enum ParentSyncerRequest { +pub enum ParentSyncerRequest { /// A new parent height is finalized Finalized(Checkpoint), QueryParentBlockViews { diff --git a/fendermint/vm/topdown/src/vote/mod.rs b/fendermint/vm/topdown/src/vote/mod.rs index 64e344fba..90df66503 100644 --- a/fendermint/vm/topdown/src/vote/mod.rs +++ b/fendermint/vm/topdown/src/vote/mod.rs @@ -47,6 +47,54 @@ pub struct VoteReactorClient { tx: mpsc::Sender, } +impl VoteReactorClient { + pub fn new(req_channel_buffer_size: usize) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(req_channel_buffer_size); + (Self { tx }, rx) + } + + pub fn start_reactor< + G: GossipClient + Send + Sync + 'static, + V: VoteStore + Send + Sync + 'static, + >( + rx: mpsc::Receiver, + params: StartVoteReactorParams, + ) -> anyhow::Result<()> { + let config = params.config; + let vote_tally = VoteTally::new( + params.power_table, + params.last_finalized_height, + params.vote_store, + )?; + + let validator_key = params.validator_key; + let internal_event_listener = params.internal_event_listener; + let latest_child_block = params.latest_child_block; + let gossip = params.gossip; + + tokio::spawn(async move { + let sleep = Duration::new(config.voting_sleep_interval_sec, 0); + + let inner = VotingHandler { + validator_key, + req_rx: rx, + internal_event_listener, + vote_tally, + latest_child_block, + config, + gossip, + }; + let mut machine = OperationStateMachine::new(inner); + loop { + machine = machine.step().await; + tokio::time::sleep(sleep).await; + } + }); + + Ok(()) + } +} + pub struct StartVoteReactorParams { pub config: VoteConfig, pub validator_key: SecretKey, @@ -58,47 +106,6 @@ pub struct StartVoteReactorParams { pub internal_event_listener: broadcast::Receiver, } -pub fn start_vote_reactor< - G: GossipClient + Send + Sync + 'static, - V: VoteStore + Send + Sync + 'static, ->( - params: StartVoteReactorParams, -) -> anyhow::Result { - let config = params.config; - let (tx, rx) = mpsc::channel(config.req_channel_buffer_size); - let vote_tally = VoteTally::new( - params.power_table, - params.last_finalized_height, - params.vote_store, - )?; - - let validator_key = params.validator_key; - let internal_event_listener = params.internal_event_listener; - let latest_child_block = params.latest_child_block; - let gossip = params.gossip; - - tokio::spawn(async move { - let sleep = Duration::new(config.voting_sleep_interval_sec, 0); - - let inner = VotingHandler { - validator_key, - req_rx: rx, - internal_event_listener, - vote_tally, - latest_child_block, - config, - gossip, - }; - let mut machine = OperationStateMachine::new(inner); - loop { - machine = machine.step().await; - tokio::time::sleep(sleep).await; - } - }); - - Ok(VoteReactorClient { tx }) -} - impl VoteReactorClient { async fn request) -> VoteReactorRequest>( &self, @@ -184,7 +191,7 @@ impl VoteReactorClient { } } -enum VoteReactorRequest { +pub enum VoteReactorRequest { /// A new child subnet block is mined, this is the fendermint block NewLocalBlockMined(BlockHeight), /// Query the current operation mode of the vote tally state machine