Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Add names for UtreexoNode fields #312

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ mod test {
user_agent: "floresta".to_string(),
};

let chain_provider: UtreexoNode<RunningNode, Arc<ChainState<KvChainStore>>> =
let chain_provider: UtreexoNode<Arc<ChainState<KvChainStore>>, RunningNode> =
UtreexoNode::new(
u_config,
chain.clone(),
Expand Down
77 changes: 40 additions & 37 deletions crates/floresta-wire/src/p2p_wire/chain_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl NodeContext for ChainSelector {
}
}

impl<Chain> UtreexoNode<ChainSelector, Chain>
impl<Chain> UtreexoNode<Chain, ChainSelector>
where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
Expand Down Expand Up @@ -156,15 +156,15 @@ where
self.send_to_peer(peer, NodeRequest::Shutdown).await?;

let peer = self.peers.get(&peer).unwrap();
self.0.address_man.update_set_state(
self.common.address_man.update_set_state(
peer.address_id as usize,
AddressState::Banned(ChainSelector::BAN_TIME),
);
}
}

let last = headers.last().unwrap().block_hash();
self.1
self.context
.tip_cache
.entry(peer)
.and_modify(|e| *e = last)
Expand Down Expand Up @@ -453,24 +453,24 @@ where
/// has send all blocks they have. Once all peers have finished, we just pick the
/// most PoW chain among all chains that we got
async fn empty_headers_message(&mut self, peer: PeerId) -> Result<(), WireError> {
match self.1.state {
match self.context.state {
ChainSelectorState::DownloadingHeaders => {
info!("Finished downloading headers from peer={peer}, checking if our peers agree");
self.poke_peers().await?;
self.1.state = ChainSelectorState::LookingForForks(Instant::now());
self.1.done_peers.insert(peer);
self.context.state = ChainSelectorState::LookingForForks(Instant::now());
self.context.done_peers.insert(peer);
}
ChainSelectorState::LookingForForks(_) => {
self.1.done_peers.insert(peer);
for peer in self.0.peer_ids.iter() {
self.context.done_peers.insert(peer);
for peer in self.common.peer_ids.iter() {
// at least one peer haven't finished
if !self.1.done_peers.contains(peer) {
if !self.context.done_peers.contains(peer) {
return Ok(());
}
}

if let Some(assume_utreexo) = self.0.config.assume_utreexo.as_ref() {
self.1.state = ChainSelectorState::Done;
if let Some(assume_utreexo) = self.common.config.assume_utreexo.as_ref() {
self.context.state = ChainSelectorState::Done;
// already assumed the chain
if self.chain.get_validation_index().unwrap() >= assume_utreexo.height {
return Ok(());
Expand All @@ -496,7 +496,7 @@ where
self.check_tips().await?;
}

self.1.state = ChainSelectorState::Done;
self.context.state = ChainSelectorState::Done;
}
_ => {}
}
Expand Down Expand Up @@ -552,8 +552,8 @@ where
}

async fn ban_peers_on_tip(&mut self, tip: BlockHash) -> Result<(), WireError> {
for peer in self.0.peers.clone() {
if self.1.tip_cache.get(&peer.0).copied().eq(&Some(tip)) {
for peer in self.common.peers.clone() {
if self.context.tip_cache.get(&peer.0).copied().eq(&Some(tip)) {
self.address_man.update_set_state(
peer.1.address_id as usize,
AddressState::Banned(ChainSelector::BAN_TIME),
Expand All @@ -580,7 +580,7 @@ where
self.chain.get_best_block()?.0
);

self.1.state = ChainSelectorState::Done;
self.context.state = ChainSelectorState::Done;
self.chain.mark_chain_as_assumed(acc, tips[0]).unwrap();
self.chain.toggle_ibd(false);
}
Expand All @@ -594,7 +594,7 @@ where
}

info!("chain close enough to tip, not asking for utreexo state");
self.1.state = ChainSelectorState::Done;
self.context.state = ChainSelectorState::Done;
Ok(())
}

Expand All @@ -612,7 +612,7 @@ where
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))
.await?;

let peer = self.1.sync_peer;
let peer = self.context.sync_peer;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));

Expand All @@ -624,7 +624,7 @@ where
/// If it does, we disconnect and ban this peer
async fn check_for_timeout(&mut self) -> Result<(), WireError> {
let (failed, mut peers) = self
.0
.common
.inflight
.iter()
.filter(|(_, (_, instant))| {
Expand All @@ -635,11 +635,11 @@ where

for request in failed {
if let InflightRequests::Headers = request {
if self.1.state == ChainSelectorState::DownloadingHeaders {
if self.context.state == ChainSelectorState::DownloadingHeaders {
let new_sync_peer = rand::random::<usize>() % self.peer_ids.len();
let new_sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
self.1.sync_peer = new_sync_peer;
self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer)
self.context.sync_peer = new_sync_peer;
self.request_headers(self.chain.get_best_block()?.1, self.context.sync_peer)
.await?;
self.inflight
.insert(InflightRequests::Headers, (new_sync_peer, Instant::now()));
Expand All @@ -652,14 +652,14 @@ where
peers.dedup();

for peer in peers {
self.0.peers.entry(peer).and_modify(|e| {
self.common.peers.entry(peer).and_modify(|e| {
if e.state != PeerStatus::Awaiting {
e.state = PeerStatus::Banned;
}
});

self.send_to_peer(peer, NodeRequest::Shutdown).await?;
self.0.peers.remove(&peer);
self.common.peers.remove(&peer);
}

Ok(())
Expand All @@ -673,7 +673,7 @@ where
/// the most PoW one.
async fn poke_peers(&self) -> Result<(), WireError> {
let locator = self.chain.get_block_locator().unwrap();
for peer in self.0.peer_ids.iter() {
for peer in self.common.peer_ids.iter() {
let get_headers = NodeRequest::GetHeaders(locator.clone());
self.send_to_peer(*peer, get_headers).await?;
}
Expand All @@ -697,30 +697,33 @@ where
ChainSelector
);

if let ChainSelectorState::LookingForForks(start) = self.1.state {
if let ChainSelectorState::LookingForForks(start) = self.context.state {
if start.elapsed().as_secs() > 30 {
self.1.state = ChainSelectorState::LookingForForks(Instant::now());
self.context.state = ChainSelectorState::LookingForForks(Instant::now());
self.poke_peers().await?;
}
}

if self.1.state == ChainSelectorState::CreatingConnections {
if self.context.state == ChainSelectorState::CreatingConnections {
// If we have enough peers, try to download headers
if !self.peer_ids.is_empty() {
let new_sync_peer = rand::random::<usize>() % self.peer_ids.len();
self.1.sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
self.context.sync_peer = *self.peer_ids.get(new_sync_peer).unwrap();
try_and_log!(
self.request_headers(self.chain.get_best_block()?.1, self.1.sync_peer)
.await
self.request_headers(
self.chain.get_best_block()?.1,
self.context.sync_peer
)
.await
);

self.1.state = ChainSelectorState::DownloadingHeaders;
self.context.state = ChainSelectorState::DownloadingHeaders;
}
}

// We downloaded all headers in the most-pow chain, and all our peers agree
// this is the most-pow chain, we're done!
if self.1.state == ChainSelectorState::Done {
if self.context.state == ChainSelectorState::Done {
try_and_log!(self.chain.flush());
break;
}
Expand All @@ -740,12 +743,12 @@ where
block: BlockHash,
height: u32,
) -> Result<FindAccResult, WireError> {
for peer_id in self.0.peer_ids.iter() {
for peer_id in self.common.peer_ids.iter() {
let peer = self.peers.get(peer_id).unwrap();
if peer.services.has(ServiceFlags::from(1 << 25)) {
self.send_to_peer(*peer_id, NodeRequest::GetUtreexoState((block, height)))
.await?;
self.0.inflight.insert(
self.common.inflight.insert(
InflightRequests::UtreexoState(*peer_id),
(*peer_id, Instant::now()),
);
Expand Down Expand Up @@ -815,16 +818,16 @@ where

PeerMessages::Ready(version) => {
self.handle_peer_ready(peer, &version).await?;
if matches!(self.1.state, ChainSelectorState::LookingForForks(_)) {
if matches!(self.context.state, ChainSelectorState::LookingForForks(_)) {
let locator = self.chain.get_block_locator().unwrap();
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))
.await?;
}
}

PeerMessages::Disconnected(idx) => {
if peer == self.1.sync_peer {
self.1.state = ChainSelectorState::CreatingConnections;
if peer == self.context.sync_peer {
self.context.state = ChainSelectorState::CreatingConnections;
}
self.handle_disconnection(peer, idx).await?;
}
Expand Down
Loading
Loading