Skip to content

Commit

Permalink
allow breaking inside for_blocks closures using ControlFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
antonilol committed May 25, 2024
1 parent 603830f commit cbb0ed1
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 19 deletions.
5 changes: 3 additions & 2 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde_json::{json, value::RawValue, Value};

use std::fs::File;
use std::io::Read;
use std::ops::ControlFlow;
use std::path::Path;

use crate::{
Expand Down Expand Up @@ -275,10 +276,10 @@ impl Daemon {
self.p2p.lock().get_new_headers(chain)
}

pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
pub(crate) fn for_blocks<B, F, R>(&self, blockhashes: B, func: F) -> Result<ControlFlow<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: FnMut(BlockHash, SerBlock) -> ControlFlow<R>,
{
self.p2p.lock().for_blocks(blockhashes, func)
}
Expand Down
1 change: 1 addition & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl Index {
index_single_block(blockhash, block, height, &mut batch);
});
self.stats.height.set("tip", height as f64);
ControlFlow::Continue::<()>(())
})?;
let heights: Vec<_> = heights.collect();
assert!(
Expand Down
21 changes: 15 additions & 6 deletions src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender};

use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::ops::ControlFlow;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use crate::types::SerBlock;
Expand Down Expand Up @@ -93,21 +94,26 @@ impl Connection {
/// Request and process the specified blocks (in the specified order).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
pub(crate) fn for_blocks<B, F, R>(
&mut self,
blockhashes: B,
mut func: F,
) -> Result<ControlFlow<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: FnMut(BlockHash, SerBlock) -> ControlFlow<R>,
{
self.blocks_duration.observe_duration("total", || {
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
if blockhashes.is_empty() {
return Ok(());
return Ok(ControlFlow::Continue(()));
}
self.blocks_duration.observe_duration("request", || {
debug!("loading {} blocks", blockhashes.len());
self.req_send.send(Request::get_blocks(&blockhashes))
})?;

let mut ret = ControlFlow::Continue(());
for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
Expand All @@ -123,10 +129,13 @@ impl Connection {
);
Ok(block)
})?;
self.blocks_duration
.observe_duration("process", || func(hash, block));
if ret.is_continue() {
ret = self
.blocks_duration
.observe_duration("process", || func(hash, block));
}
}
Ok(())
Ok(ret)
})
}

Expand Down
11 changes: 9 additions & 2 deletions src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,15 @@ impl ScriptHashStatus {
}

/// Apply func only on the new blocks (fetched from daemon).
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
fn for_new_blocks<B, F, R>(
&self,
blockhashes: B,
daemon: &Daemon,
func: F,
) -> Result<ControlFlow<R>>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
F: FnMut(BlockHash, SerBlock) -> ControlFlow<R>,
{
daemon.for_blocks(
blockhashes
Expand Down Expand Up @@ -347,6 +352,7 @@ impl ScriptHashStatus {
.or_insert_with(|| TxEntry::new(filtered_outputs.txid))
.outputs = filtered_outputs.result;
}
ControlFlow::Continue::<()>(())
})?;
let spending_blockhashes: HashSet<BlockHash> = outpoints
.par_iter()
Expand All @@ -361,6 +367,7 @@ impl ScriptHashStatus {
.or_insert_with(|| TxEntry::new(filtered_inputs.txid))
.spent = filtered_inputs.result;
}
ControlFlow::Continue::<()>(())
})?;

Ok(result
Expand Down
22 changes: 13 additions & 9 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bitcoin_slices::{
Error::VisitBreak,
Visit,
};
use std::ops::ControlFlow;

use crate::{
cache::Cache,
Expand Down Expand Up @@ -109,17 +110,20 @@ impl Tracker {
) -> Result<Option<(BlockHash, Transaction)>> {
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
let blockhashes = self.index.filter_by_txid(txid);
let mut result = None;
daemon.for_blocks(blockhashes, |blockhash, block| {
if result.is_some() {
return; // keep first matching transaction
}
let result = daemon.for_blocks(blockhashes, |blockhash, block| {
let mut visitor = FindTransaction::new(txid);
result = match bsl::Block::visit(&block, &mut visitor) {
Ok(_) | Err(VisitBreak) => visitor.tx_found().map(|tx| (blockhash, tx)),
match bsl::Block::visit(&block, &mut visitor) {
Ok(_) | Err(VisitBreak) => (),
Err(e) => panic!("core returned invalid block: {:?}", e),
};
}
match visitor.tx_found() {
Some(tx) => ControlFlow::Break((blockhash, tx)),
None => ControlFlow::Continue(()),
}
})?;
Ok(result)
Ok(match result {
ControlFlow::Continue(..) => None,
ControlFlow::Break(x) => Some(x),
})
}
}

0 comments on commit cbb0ed1

Please sign in to comment.