Skip to content

Commit

Permalink
Naming UtreexoNode Fields and Traits Impl now follow the respective p…
Browse files Browse the repository at this point in the history
…osition of the field
  • Loading branch information
jaoleal committed Jan 1, 2025
1 parent 8344049 commit 3c72c6c
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 104 deletions.
2 changes: 1 addition & 1 deletion crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ mod test {
user_agent: "floresta".to_string(),
};

let chain_provider: UtreexoNode<RunningNode, Arc<ChainState<KvChainStore>>> =
let chain_provider: UtreexoNode<Arc<ChainState<KvChainStore>>, RunningNode> =
UtreexoNode::new(
u_config,
chain.clone(),
Expand Down
77 changes: 40 additions & 37 deletions crates/floresta-wire/src/p2p_wire/chain_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl NodeContext for ChainSelector {
}
}

impl<Chain> UtreexoNode<ChainSelector, Chain>
impl<Chain> UtreexoNode<Chain, ChainSelector>
where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
Expand Down Expand Up @@ -156,15 +156,15 @@ where
self.send_to_peer(peer, NodeRequest::Shutdown).await?;

let peer = self.peers.get(&peer).unwrap();
self.0.address_man.update_set_state(
self.common.address_man.update_set_state(
peer.address_id as usize,
AddressState::Banned(ChainSelector::BAN_TIME),
);
}
}

let last = headers.last().unwrap().block_hash();
self.1
self.context
.tip_cache
.entry(peer)
.and_modify(|e| *e = last)
Expand Down Expand Up @@ -453,24 +453,24 @@ where
/// has send all blocks they have. Once all peers have finished, we just pick the
/// most PoW chain among all chains that we got
async fn empty_headers_message(&mut self, peer: PeerId) -> Result<(), WireError> {
match self.1.state {
match self.context.state {
ChainSelectorState::DownloadingHeaders => {
info!("Finished downloading headers from peer={peer}, checking if our peers agree");
self.poke_peers().await?;
self.1.state = ChainSelectorState::LookingForForks(Instant::now());
self.1.done_peers.insert(peer);
self.context.state = ChainSelectorState::LookingForForks(Instant::now());
self.context.done_peers.insert(peer);
}
ChainSelectorState::LookingForForks(_) => {
self.1.done_peers.insert(peer);
for peer in self.0.peer_ids.iter() {
self.context.done_peers.insert(peer);
for peer in self.common.peer_ids.iter() {
// at least one peer haven't finished
if !self.1.done_peers.contains(peer) {
if !self.context.done_peers.contains(peer) {
return Ok(());
}
}

if let Some(assume_utreexo) = self.0.config.assume_utreexo.as_ref() {
self.1.state = ChainSelectorState::Done;
if let Some(assume_utreexo) = self.common.config.assume_utreexo.as_ref() {
self.context.state = ChainSelectorState::Done;
// already assumed the chain
if self.chain.get_validation_index().unwrap() >= assume_utreexo.height {
return Ok(());
Expand All @@ -496,7 +496,7 @@ where
self.check_tips().await?;
}

self.1.state = ChainSelectorState::Done;
self.context.state = ChainSelectorState::Done;
}
_ => {}
}
Expand Down Expand Up @@ -552,8 +552,8 @@ where
}

async fn ban_peers_on_tip(&mut self, tip: BlockHash) -> Result<(), WireError> {
for peer in self.0.peers.clone() {
if self.1.tip_cache.get(&peer.0).copied().eq(&Some(tip)) {
for peer in self.common.peers.clone() {
if self.context.tip_cache.get(&peer.0).copied().eq(&Some(tip)) {
self.address_man.update_set_state(
peer.1.address_id as usize,
AddressState::Banned(ChainSelector::BAN_TIME),
Expand All @@ -580,7 +580,7 @@ where
self.chain.get_best_block()?.0
);

self.1.state = ChainSelectorState::Done;
self.context.state = ChainSelectorState::Done;
self.chain.mark_chain_as_assumed(acc, tips[0]).unwrap();
self.chain.toggle_ibd(false);
}
Expand All @@ -594,7 +594,7 @@ where
}

info!("chain close enough to tip, not asking for utreexo state");
self.1.state = ChainSelectorState::Done;
self.context.state = ChainSelectorState::Done;
Ok(())
}

Expand All @@ -612,7 +612,7 @@ where
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))
.await?;

let peer = self.1.sync_peer;
let peer = self.context.sync_peer;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));

Expand All @@ -624,7 +624,7 @@ where
/// If it does, we disconnect and ban this peer
async fn check_for_timeout(&mut self) -> Result<(), WireError> {
let (failed, mut peers) = self
.0
.common
.inflight
.iter()
.filter(|(_, (_, instant))| {
Expand All @@ -635,11 +635,11 @@ where

for request in failed {
if let InflightRequests::Headers = request {
if self.1.state == ChainSelectorState::DownloadingHeaders {
if self.context.state == ChainSelectorState::DownloadingHeaders {
let new_sync_peer = rand::random::<usize>() % self.peer_ids.len();
let new_sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
self.1.sync_peer = new_sync_peer;
self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer)
self.context.sync_peer = new_sync_peer;
self.request_headers(self.chain.get_best_block()?.1, self.context.sync_peer)
.await?;
self.inflight
.insert(InflightRequests::Headers, (new_sync_peer, Instant::now()));
Expand All @@ -652,14 +652,14 @@ where
peers.dedup();

for peer in peers {
self.0.peers.entry(peer).and_modify(|e| {
self.common.peers.entry(peer).and_modify(|e| {
if e.state != PeerStatus::Awaiting {
e.state = PeerStatus::Banned;
}
});

self.send_to_peer(peer, NodeRequest::Shutdown).await?;
self.0.peers.remove(&peer);
self.common.peers.remove(&peer);
}

Ok(())
Expand All @@ -673,7 +673,7 @@ where
/// the most PoW one.
async fn poke_peers(&self) -> Result<(), WireError> {
let locator = self.chain.get_block_locator().unwrap();
for peer in self.0.peer_ids.iter() {
for peer in self.common.peer_ids.iter() {
let get_headers = NodeRequest::GetHeaders(locator.clone());
self.send_to_peer(*peer, get_headers).await?;
}
Expand All @@ -697,30 +697,33 @@ where
ChainSelector
);

if let ChainSelectorState::LookingForForks(start) = self.1.state {
if let ChainSelectorState::LookingForForks(start) = self.context.state {
if start.elapsed().as_secs() > 30 {
self.1.state = ChainSelectorState::LookingForForks(Instant::now());
self.context.state = ChainSelectorState::LookingForForks(Instant::now());
self.poke_peers().await?;
}
}

if self.1.state == ChainSelectorState::CreatingConnections {
if self.context.state == ChainSelectorState::CreatingConnections {
// If we have enough peers, try to download headers
if !self.peer_ids.is_empty() {
let new_sync_peer = rand::random::<usize>() % self.peer_ids.len();
self.1.sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
self.context.sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
try_and_log!(
self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer)
.await
self.request_headers(
self.chain.get_best_block()?.1,
self.context.sync_peer
)
.await
);

self.1.state = ChainSelectorState::DownloadingHeaders;
self.context.state = ChainSelectorState::DownloadingHeaders;
}
}

// We downloaded all headers in the most-pow chain, and all our peers agree
// this is the most-pow chain, we're done!
if self.1.state == ChainSelectorState::Done {
if self.context.state == ChainSelectorState::Done {
try_and_log!(self.chain.flush());
break;
}
Expand All @@ -740,12 +743,12 @@ where
block: BlockHash,
height: u32,
) -> Result<FindAccResult, WireError> {
for peer_id in self.0.peer_ids.iter() {
for peer_id in self.common.peer_ids.iter() {
let peer = self.peers.get(peer_id).unwrap();
if peer.services.has(ServiceFlags::from(1 << 25)) {
self.send_to_peer(*peer_id, NodeRequest::GetUtreexoState((block, height)))
.await?;
self.0.inflight.insert(
self.common.inflight.insert(
InflightRequests::UtreexoState(*peer_id),
(*peer_id, Instant::now()),
);
Expand Down Expand Up @@ -815,16 +818,16 @@ where

PeerMessages::Ready(version) => {
self.handle_peer_ready(peer, &version).await?;
if matches!(self.1.state, ChainSelectorState::LookingForForks(_)) {
if matches!(self.context.state, ChainSelectorState::LookingForForks(_)) {
let locator = self.chain.get_block_locator().unwrap();
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))
.await?;
}
}

PeerMessages::Disconnected(idx) => {
if peer == self.1.sync_peer {
self.1.state = ChainSelectorState::CreatingConnections;
if peer == self.context.sync_peer {
self.context.state = ChainSelectorState::CreatingConnections;
}
self.handle_disconnection(peer, idx).await?;
}
Expand Down
Loading

0 comments on commit 3c72c6c

Please sign in to comment.