Skip to content

Commit

Permalink
node.rs: Refactor trait bounds and rework visibility attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
Davidson-Souza committed Oct 10, 2023
1 parent 8662086 commit 4fcdef3
Showing 1 changed file with 27 additions and 47 deletions.
74 changes: 27 additions & 47 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,11 @@ enum PeerStatus {
Ready,
ShutingDown,
}
impl<
T: 'static + Default + NodeContext,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
> UtreexoNode<T, Chain>
impl<T, Chain> UtreexoNode<T, Chain>
where
T: 'static + Default + NodeContext,
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
pub fn new(
chain: Arc<Chain>,
Expand Down Expand Up @@ -224,7 +223,7 @@ where
initial_height: peer.height,
})
}
async fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
if let Some(p) = self.peers.remove(&peer) {
p.channel.close();
if !p.feeler && p.state == PeerStatus::Ready {
Expand Down Expand Up @@ -359,30 +358,6 @@ where

Ok((proof, hashes, inputs))
}
pub async fn ibd_handle_headers(&mut self, headers: Vec<BlockHeader>) -> Result<(), WireError> {
if headers.is_empty() {
// Start downloading blocks
self.chain.flush()?;
self.state = NodeState::DownloadBlocks;
return Ok(());
}
self.last_headers_request = Instant::now();
trace!(
"Downloading headers at: {} hash: {}",
self.chain.get_best_block()?.0,
headers[0].block_hash()
);
for header in headers {
self.chain.accept_header(header)?;
}
let locator = self.chain.get_block_locator()?;
let peer = self
.send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE)
.await?;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
Ok(())
}
async fn send_to_peer(&self, peer_id: u32, req: NodeRequest) -> Result<(), WireError> {
if let Some(peer) = &self.peers.get(&peer_id) {
if peer.state == PeerStatus::Ready {
Expand Down Expand Up @@ -446,7 +421,7 @@ where
Ok(())
}
#[inline]
pub async fn send_to_random_peer(
async fn send_to_random_peer(
&mut self,
req: NodeRequest,
required_services: ServiceFlags,
Expand Down Expand Up @@ -489,7 +464,7 @@ where
Err(WireError::NoPeerToSendRequest)
}

pub async fn init_peers(&mut self) -> Result<(), WireError> {
async fn init_peers(&mut self) -> Result<(), WireError> {
let anchors = self
.0
.address_man
Expand All @@ -506,19 +481,19 @@ where
Ok(())
}

pub async fn shutdown(&mut self) {
async fn shutdown(&mut self) {
info!("Shutting down node");
for peer in self.peer_ids.iter() {
try_and_log!(self.send_to_peer(*peer, NodeRequest::Shutdown).await);
}
try_and_log!(self.save_peers());
try_and_log!(self.chain.flush());
}
pub async fn ask_block(&mut self) -> Result<(), WireError> {
async fn ask_block(&mut self) -> Result<(), WireError> {
let blocks = self.get_blocks_to_download()?;
self.request_blocks(blocks).await
}
pub async fn handle_broadcast(&self) -> Result<(), WireError> {
async fn handle_broadcast(&self) -> Result<(), WireError> {
for (_, peer) in self.peers.iter() {
if peer.services.has(ServiceFlags::NODE_UTREEXO) {
continue;
Expand All @@ -544,7 +519,7 @@ where
}
Ok(())
}
pub async fn ask_for_addresses(&mut self) -> Result<(), WireError> {
async fn ask_for_addresses(&mut self) -> Result<(), WireError> {
let _ = self
.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)
.await?;
Expand Down Expand Up @@ -630,6 +605,7 @@ where
self.open_connection(feeler, peer_id, address).await;
Some(())
}
/// Opens a new connection that doesn't require a proxy.
#[allow(clippy::too_many_arguments)]
fn open_non_proxy_connection(
feeler: bool,
Expand All @@ -652,6 +628,7 @@ where
feeler,
)
}
/// Opens a connection through a socks5 interface
#[allow(clippy::too_many_arguments)]
async fn open_proxy_connection(
proxy: SocketAddr,
Expand Down Expand Up @@ -690,6 +667,9 @@ where
);
Ok(())
}
/// Creates a new outgoing connection with `address`. Connection may or may not be feeler,
/// a special connection type that is used to learn about good peers, but are not kept afer
/// handshake.
async fn open_connection(&mut self, feeler: bool, peer_id: usize, address: LocalAddress) {
let (requests_tx, requests_rx) = bounded(1024);
if let Some(ref proxy) = self.socks5 {
Expand Down Expand Up @@ -747,15 +727,14 @@ where
self.peer_id_count += 1;
}
}
impl<Chain: BlockchainInterface + UpdatableChainstate + 'static> UtreexoNode<IBDNode, Chain>

/// An IBD node, should be used to get your chainstate up-to-date with the network, but
/// returns as soon as there's no more blocks to download.
impl<Chain> UtreexoNode<IBDNode, Chain>
where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
/// Processing blocks actually takes a lot of CPU time, and we need to wait until it either
/// succeed or fail before doing something else. This will hang our node up (not peers, only
/// the node) and make it do funky stuff, like timeout blocks we just got but not processed.
/// This task solves it by taking up the actual CPU time for processing blocks, while our
/// node's main loop can continue normally.
async fn handle_block(chain: &Arc<Chain>, block: UtreexoBlock) -> Result<(), WireError> {
let (proof, del_hashes, inputs) = Self::process_proof(
&block.udata.unwrap(),
Expand All @@ -777,7 +756,7 @@ where
}));
Ok(())
}
pub async fn handle_headers(&mut self, headers: Vec<BlockHeader>) -> Result<(), WireError> {
async fn handle_headers(&mut self, headers: Vec<BlockHeader>) -> Result<(), WireError> {
if headers.is_empty() {
// Start downloading blocks
self.chain.flush()?;
Expand Down Expand Up @@ -900,7 +879,7 @@ where
Ok(())
}

pub async fn handle_notification(
async fn handle_notification(
&mut self,
notification: Result<NodeNotification, async_std::channel::RecvError>,
) -> Result<(), WireError> {
Expand Down Expand Up @@ -973,7 +952,7 @@ where
}

PeerMessages::Disconnected(idx) => {
self.handle_disconnection(peer, idx).await?;
self.handle_disconnection(peer, idx)?;

if self.peer_ids.is_empty() || self.utreexo_peers.is_empty() {
self.state = NodeState::WaitingPeer;
Expand All @@ -991,9 +970,10 @@ where
}
}

impl<Chain: BlockchainInterface + UpdatableChainstate + 'static> UtreexoNode<RunningNode, Chain>
impl<Chain> UtreexoNode<RunningNode, Chain>
where
WireError: From<<Chain as BlockchainInterface>::Error>,
Chain: BlockchainInterface + UpdatableChainstate + 'static,
{
/// Returns a handle to the node interface that we can use to request data from our
/// node. This struct is thread safe, so we can use it from multiple threads and have
Expand Down Expand Up @@ -1398,7 +1378,7 @@ where
self.last_tip_update = Instant::now();
Ok(())
}
pub async fn handle_notification(
async fn handle_notification(
&mut self,
notification: Result<NodeNotification, async_std::channel::RecvError>,
) -> Result<(), WireError> {
Expand Down Expand Up @@ -1428,7 +1408,7 @@ where
}
}
PeerMessages::Disconnected(idx) => {
self.handle_disconnection(peer, idx).await?;
self.handle_disconnection(peer, idx)?;
}
PeerMessages::Addr(addresses) => {
let addresses: Vec<_> =
Expand Down

0 comments on commit 4fcdef3

Please sign in to comment.