Skip to content

Commit

Permalink
refactor launch
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptoAtwill committed Oct 28, 2024
1 parent f83472d commit b862797
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 107 deletions.
87 changes: 51 additions & 36 deletions fendermint/vm/topdown/src/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::proxy::ParentQueryProxy;
use crate::syncer::{start_parent_syncer, ParentPoller, ParentSyncerConfig};
use crate::syncer::{ParentPoller, ParentSyncerConfig, ParentSyncerReactorClient};
use crate::vote::gossip::GossipClient;
use crate::vote::payload::PowerUpdates;
use crate::vote::store::InMemoryVoteStore;
use crate::vote::{start_vote_reactor, StartVoteReactorParams};
use crate::vote::{StartVoteReactorParams, VoteReactorClient};
use crate::{BlockHeight, Checkpoint, Config, TopdownClient, TopdownProposal};
use anyhow::anyhow;
use cid::Cid;
Expand All @@ -33,47 +33,62 @@ pub async fn run_topdown<CheckpointQuery, Gossip, Poller, ParentClient>(
validator_key: SecretKey,
gossip_client: Gossip,
parent_client: ParentClient,
poller_fn: impl FnOnce(&Checkpoint, ParentClient, ParentSyncerConfig) -> Poller,
poller_fn: impl FnOnce(&Checkpoint, ParentClient, ParentSyncerConfig) -> Poller + Send + 'static,
) -> anyhow::Result<TopdownClient>
where
CheckpointQuery: LaunchQuery + Send + Sync + 'static,
Gossip: GossipClient + Send + Sync + 'static,
Poller: ParentPoller + Send + Sync + 'static,
ParentClient: ParentQueryProxy + Send + Sync + 'static,
{
let query = Arc::new(query);
let checkpoint = query_starting_checkpoint(&query, &parent_client).await?;

let power_table = query_starting_committee(&query).await?;
let power_table = power_table
.into_iter()
.map(|v| {
let vk = ValidatorKey::new(v.public_key.0);
let w = v.power.0;
(vk, w)
})
.collect::<Vec<_>>();

let poller = poller_fn(&checkpoint, parent_client, config.syncer.clone());
let internal_event_rx = poller.subscribe();

let syncer_client = start_parent_syncer(config.syncer, poller)?;

let voting_client = start_vote_reactor(StartVoteReactorParams {
config: config.voting,
validator_key,
power_table,
last_finalized_height: checkpoint.target_height(),
latest_child_block: query.latest_chain_block()?,
gossip: gossip_client,
vote_store: InMemoryVoteStore::default(),
internal_event_listener: internal_event_rx,
})?;

tracing::info!(
finality = checkpoint.to_string(),
"launching parent syncer with last committed checkpoint"
);
let (syncer_client, syncer_rx) =
ParentSyncerReactorClient::new(config.syncer.request_channel_size);
let (voting_client, voting_rx) = VoteReactorClient::new(config.voting.req_channel_buffer_size);

tokio::spawn(async move {
let query = Arc::new(query);
let checkpoint = query_starting_checkpoint(&query, &parent_client)
.await
.expect("should be able to query starting checkpoint");

let power_table = query_starting_committee(&query)
.await
.expect("should be able to query starting committee");
let power_table = power_table
.into_iter()
.map(|v| {
let vk = ValidatorKey::new(v.public_key.0);
let w = v.power.0;
(vk, w)
})
.collect::<Vec<_>>();

let poller = poller_fn(&checkpoint, parent_client, config.syncer.clone());
let internal_event_rx = poller.subscribe();

ParentSyncerReactorClient::start_reactor(syncer_rx, poller, config.syncer);
VoteReactorClient::start_reactor(
voting_rx,
StartVoteReactorParams {
config: config.voting,
validator_key,
power_table,
last_finalized_height: checkpoint.target_height(),
latest_child_block: query
.latest_chain_block()
.expect("should query latest chain block"),
gossip: gossip_client,
vote_store: InMemoryVoteStore::default(),
internal_event_listener: internal_event_rx,
},
)
.expect("cannot start vote reactor");

tracing::info!(
finality = checkpoint.to_string(),
"launching parent syncer with last committed checkpoint"
);
});

Ok(TopdownClient {
syncer: syncer_client,
Expand Down
63 changes: 34 additions & 29 deletions fendermint/vm/topdown/src/syncer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,39 @@ pub struct ParentSyncerReactorClient {
tx: mpsc::Sender<ParentSyncerRequest>,
}

impl ParentSyncerReactorClient {
pub fn new(request_channel_size: usize) -> (Self, mpsc::Receiver<ParentSyncerRequest>) {
let (tx, rx) = mpsc::channel(request_channel_size);
(Self { tx }, rx)
}

pub fn start_reactor<P: ParentPoller + Send + Sync + 'static>(
mut rx: mpsc::Receiver<ParentSyncerRequest>,
mut poller: P,
config: ParentSyncerConfig,
) {
tokio::spawn(async move {
let polling_interval = config.polling_interval;

loop {
select! {
_ = tokio::time::sleep(polling_interval) => {
if let Err(e) = poller.try_poll().await {
tracing::error!(err = e.to_string(), "cannot sync with parent");
}
}
req = rx.recv() => {
let Some(req) = req else { break };
handle_request(req, &mut poller);
}
}
}

tracing::warn!("parent syncer stopped")
});
}
}

/// Polls the parent block view
#[async_trait]
pub trait ParentPoller {
Expand All @@ -69,34 +102,6 @@ pub trait ParentPoller {
) -> anyhow::Result<Vec<Option<ParentBlockView>>>;
}

pub fn start_parent_syncer<P: Send + Sync + 'static + ParentPoller>(
config: ParentSyncerConfig,
mut poller: P,
) -> anyhow::Result<ParentSyncerReactorClient> {
let (tx, mut rx) = mpsc::channel(config.request_channel_size);

tokio::spawn(async move {
let polling_interval = config.polling_interval;

loop {
select! {
_ = tokio::time::sleep(polling_interval) => {
if let Err(e) = poller.try_poll().await {
tracing::error!(err = e.to_string(), "cannot sync with parent");
}
}
req = rx.recv() => {
let Some(req) = req else { break };
handle_request(req, &mut poller);
}
}
}

tracing::warn!("parent syncer stopped")
});
Ok(ParentSyncerReactorClient { tx })
}

impl ParentSyncerReactorClient {
/// Marks the height as finalized.
/// There is no need to wait for ack from the reactor
Expand All @@ -117,7 +122,7 @@ impl ParentSyncerReactorClient {
}
}

enum ParentSyncerRequest {
pub enum ParentSyncerRequest {
/// A new parent height is finalized
Finalized(Checkpoint),
QueryParentBlockViews {
Expand Down
91 changes: 49 additions & 42 deletions fendermint/vm/topdown/src/vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,54 @@ pub struct VoteReactorClient {
tx: mpsc::Sender<VoteReactorRequest>,
}

impl VoteReactorClient {
pub fn new(req_channel_buffer_size: usize) -> (Self, mpsc::Receiver<VoteReactorRequest>) {
let (tx, rx) = mpsc::channel(req_channel_buffer_size);
(Self { tx }, rx)
}

pub fn start_reactor<
G: GossipClient + Send + Sync + 'static,
V: VoteStore + Send + Sync + 'static,
>(
rx: mpsc::Receiver<VoteReactorRequest>,
params: StartVoteReactorParams<G, V>,
) -> anyhow::Result<()> {
let config = params.config;
let vote_tally = VoteTally::new(
params.power_table,
params.last_finalized_height,
params.vote_store,
)?;

let validator_key = params.validator_key;
let internal_event_listener = params.internal_event_listener;
let latest_child_block = params.latest_child_block;
let gossip = params.gossip;

tokio::spawn(async move {
let sleep = Duration::new(config.voting_sleep_interval_sec, 0);

let inner = VotingHandler {
validator_key,
req_rx: rx,
internal_event_listener,
vote_tally,
latest_child_block,
config,
gossip,
};
let mut machine = OperationStateMachine::new(inner);
loop {
machine = machine.step().await;
tokio::time::sleep(sleep).await;
}
});

Ok(())
}
}

pub struct StartVoteReactorParams<G, V> {
pub config: VoteConfig,
pub validator_key: SecretKey,
Expand All @@ -58,47 +106,6 @@ pub struct StartVoteReactorParams<G, V> {
pub internal_event_listener: broadcast::Receiver<TopDownSyncEvent>,
}

pub fn start_vote_reactor<
G: GossipClient + Send + Sync + 'static,
V: VoteStore + Send + Sync + 'static,
>(
params: StartVoteReactorParams<G, V>,
) -> anyhow::Result<VoteReactorClient> {
let config = params.config;
let (tx, rx) = mpsc::channel(config.req_channel_buffer_size);
let vote_tally = VoteTally::new(
params.power_table,
params.last_finalized_height,
params.vote_store,
)?;

let validator_key = params.validator_key;
let internal_event_listener = params.internal_event_listener;
let latest_child_block = params.latest_child_block;
let gossip = params.gossip;

tokio::spawn(async move {
let sleep = Duration::new(config.voting_sleep_interval_sec, 0);

let inner = VotingHandler {
validator_key,
req_rx: rx,
internal_event_listener,
vote_tally,
latest_child_block,
config,
gossip,
};
let mut machine = OperationStateMachine::new(inner);
loop {
machine = machine.step().await;
tokio::time::sleep(sleep).await;
}
});

Ok(VoteReactorClient { tx })
}

impl VoteReactorClient {
async fn request<T, F: FnOnce(oneshot::Sender<T>) -> VoteReactorRequest>(
&self,
Expand Down Expand Up @@ -184,7 +191,7 @@ impl VoteReactorClient {
}
}

enum VoteReactorRequest {
pub enum VoteReactorRequest {
/// A new child subnet block is mined, this is the fendermint block
NewLocalBlockMined(BlockHeight),
/// Query the current operation mode of the vote tally state machine
Expand Down

0 comments on commit b862797

Please sign in to comment.