diff --git a/src/daemon.rs b/src/daemon.rs index df221738e..251ae7979 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -10,6 +10,7 @@ use serde_json::{json, value::RawValue, Value}; use std::fs::File; use std::io::Read; +use std::ops::ControlFlow; use std::path::Path; use crate::{ @@ -275,10 +276,10 @@ impl Daemon { self.p2p.lock().get_new_headers(chain) } - pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result<()> + pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) -> ControlFlow, { self.p2p.lock().for_blocks(blockhashes, func) } diff --git a/src/index.rs b/src/index.rs index e4d2a6ae8..f2e059700 100644 --- a/src/index.rs +++ b/src/index.rs @@ -216,6 +216,7 @@ impl Index { index_single_block(blockhash, block, height, &mut batch); }); self.stats.height.set("tip", height as f64); + ControlFlow::Continue::<()>(()) })?; let heights: Vec<_> = heights.collect(); assert!( diff --git a/src/p2p.rs b/src/p2p.rs index 3922f2771..1d7a9172d 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -22,6 +22,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender}; use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; +use std::ops::ControlFlow; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::types::SerBlock; @@ -93,21 +94,26 @@ impl Connection { /// Request and process the specified blocks (in the specified order). /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> + pub(crate) fn for_blocks( + &mut self, + blockhashes: B, + mut func: F, + ) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) -> ControlFlow, { self.blocks_duration.observe_duration("total", || { let blockhashes: Vec = blockhashes.into_iter().collect(); if blockhashes.is_empty() { - return Ok(()); + return Ok(ControlFlow::Continue(())); } self.blocks_duration.observe_duration("request", || { debug!("loading {} blocks", blockhashes.len()); self.req_send.send(Request::get_blocks(&blockhashes)) })?; + let mut ret = ControlFlow::Continue(()); for hash in blockhashes { let block = self.blocks_duration.observe_duration("response", || { let block = self @@ -123,10 +129,13 @@ impl Connection { ); Ok(block) })?; - self.blocks_duration - .observe_duration("process", || func(hash, block)); + if ret.is_continue() { + ret = self + .blocks_duration + .observe_duration("process", || func(hash, block)); + } } - Ok(()) + Ok(ret) }) } diff --git a/src/status.rs b/src/status.rs index 41115b394..d6432026e 100644 --- a/src/status.rs +++ b/src/status.rs @@ -308,10 +308,15 @@ impl ScriptHashStatus { } /// Apply func only on the new blocks (fetched from daemon). - fn for_new_blocks(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()> + fn for_new_blocks( + &self, + blockhashes: B, + daemon: &Daemon, + func: F, + ) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) -> ControlFlow, { daemon.for_blocks( blockhashes @@ -347,6 +352,7 @@ impl ScriptHashStatus { .or_insert_with(|| TxEntry::new(filtered_outputs.txid)) .outputs = filtered_outputs.result; } + ControlFlow::Continue::<()>(()) })?; let spending_blockhashes: HashSet = outpoints .par_iter() @@ -361,6 +367,7 @@ impl ScriptHashStatus { .or_insert_with(|| TxEntry::new(filtered_inputs.txid)) .spent = filtered_inputs.result; } + ControlFlow::Continue::<()>(()) })?; Ok(result diff --git a/src/tracker.rs b/src/tracker.rs index 9ed97f355..2823377dc 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -5,6 +5,7 @@ use bitcoin_slices::{ Error::VisitBreak, Visit, }; +use std::ops::ControlFlow; use crate::{ cache::Cache, @@ -109,17 +110,20 @@ impl Tracker { ) -> Result> { // Note: there are two blocks with coinbase transactions having same txid (see BIP-30) let blockhashes = self.index.filter_by_txid(txid); - let mut result = None; - daemon.for_blocks(blockhashes, |blockhash, block| { - if result.is_some() { - return; // keep first matching transaction - } + let result = daemon.for_blocks(blockhashes, |blockhash, block| { let mut visitor = FindTransaction::new(txid); - result = match bsl::Block::visit(&block, &mut visitor) { - Ok(_) | Err(VisitBreak) => visitor.tx_found().map(|tx| (blockhash, tx)), + match bsl::Block::visit(&block, &mut visitor) { + Ok(_) | Err(VisitBreak) => (), Err(e) => panic!("core returned invalid block: {:?}", e), - }; + } + match visitor.tx_found() { + Some(tx) => ControlFlow::Break((blockhash, tx)), + None => ControlFlow::Continue(()), + } })?; - Ok(result) + Ok(match result { + ControlFlow::Continue(..) => None, + ControlFlow::Break(x) => Some(x), + }) } }