Skip to content

Commit

Permalink
execute mempool scanning in separate thread
Browse files Browse the repository at this point in the history
The mempool scanning procedure can now be launched
in a separate thread, while mempool updates are
applied synchronously in the main thread.
  • Loading branch information
conduition committed Dec 12, 2023
1 parent 06303d9 commit cd4c279
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 39 deletions.
27 changes: 21 additions & 6 deletions src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ use rayon::prelude::*;
use serde_derive::Deserialize;
use serde_json::{self, json, Value};

use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::fmt;
use std::iter::FromIterator;
use std::str::FromStr;
use std::sync::Arc;

use crate::{
cache::Cache,
config::{Config, ELECTRS_VERSION},
daemon::{self, extract_bitcoind_error, Daemon},
mempool::MempoolSyncUpdate,
merkle::Proof,
metrics::{self, Histogram, Metrics},
signals::Signal,
Expand Down Expand Up @@ -123,7 +125,7 @@ pub struct Rpc {
tracker: Tracker,
cache: Cache,
rpc_duration: Histogram,
daemon: Daemon,
daemon: Arc<Daemon>,
signal: Signal,
banner: String,
port: u16,
Expand All @@ -147,7 +149,7 @@ impl Rpc {
tracker,
cache,
rpc_duration,
daemon,
daemon: Arc::new(daemon),
signal,
banner: config.server_banner.clone(),
port: config.electrum_rpc_addr.port(),
Expand All @@ -158,12 +160,25 @@ impl Rpc {
&self.signal
}

pub(crate) fn daemon(&self) -> &Arc<Daemon> {
&self.daemon
}

pub fn new_block_notification(&self) -> Receiver<()> {
self.daemon.new_block_notification()
}

pub fn sync(&mut self) -> Result<bool> {
self.tracker.sync(&self.daemon, self.signal.exit_flag())
pub fn sync_chain(&mut self) -> Result<bool> {
self.tracker
.sync_chain(&self.daemon, self.signal.exit_flag())
}

pub(crate) fn mempool_txids(&self) -> HashSet<Txid> {
self.tracker.mempool.all_txids()
}

pub(crate) fn mempool_apply(&mut self, sync_update: MempoolSyncUpdate) {
self.tracker.mempool.apply_sync_update(sync_update)
}

pub fn update_client(&self, client: &mut Client) -> Result<Vec<String>> {
Expand Down Expand Up @@ -432,7 +447,7 @@ impl Rpc {
}

fn get_fee_histogram(&self) -> Result<Value> {
Ok(json!(self.tracker.fees_histogram()))
Ok(json!(self.tracker.mempool.fees_histogram()))
}

fn server_id(&self) -> String {
Expand Down
20 changes: 4 additions & 16 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl Mempool {
&self.fees
}

pub(crate) fn all_txids(&self) -> HashSet<Txid> {
HashSet::<Txid>::from_iter(self.entries.keys().copied())
}

pub(crate) fn get(&self, txid: &Txid) -> Option<&Entry> {
self.entries.get(txid)
}
Expand Down Expand Up @@ -186,22 +190,6 @@ impl Mempool {
}
}

pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) {
let old_txids = HashSet::<Txid>::from_iter(self.entries.keys().copied());

let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag);

let sync_update = match poll_result {
Ok(sync_update) => sync_update,
Err(e) => {
warn!("mempool sync failed: {}", e);
return;
}
};

self.apply_sync_update(sync_update);
}

/// Add a transaction entry to the mempool and update the fee histogram.
fn add_entry(&mut self, entry: Entry) {
for txi in &entry.tx.input {
Expand Down
72 changes: 68 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use anyhow::{Context, Result};
use crossbeam_channel::{select, unbounded, Sender};
use bitcoin::Txid;
use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender};
use rayon::prelude::*;

use std::{
collections::hash_map::HashMap,
collections::{HashMap, HashSet},
io::{BufRead, BufReader, Write},
iter::once,
net::{Shutdown, TcpListener, TcpStream},
sync::Arc,
};

use crate::{
config::Config,
daemon::Daemon,
electrum::{Client, Rpc},
mempool::MempoolSyncUpdate,
metrics::{self, Metrics},
signals::ExitError,
signals::{ExitError, ExitFlag},
thread::spawn,
};

Expand Down Expand Up @@ -87,19 +91,58 @@ fn serve() -> Result<()> {

let new_block_rx = rpc.new_block_notification();
let mut peers = HashMap::<usize, Peer>::new();

let (mempool_update_tx, mempool_update_rx) = bounded(0);
let (mempool_run_tx, mempool_run_rx) = bounded(0);

let daemon = Arc::clone(rpc.daemon());
let exit_flag = rpc.signal().exit_flag().clone();
if !config.ignore_mempool {
spawn("mempool_sync", move || {
mempool_sync_loop(daemon, mempool_update_tx, mempool_run_rx, exit_flag)
});
}

loop {
// initial sync and compaction may take a few hours
while server_rx.is_empty() {
let done = duration.observe_duration("sync", || rpc.sync().context("sync failed"))?; // sync a batch of blocks
let done =
duration.observe_duration("sync", || rpc.sync_chain().context("sync failed"))?; // sync a batch of blocks
peers = duration.observe_duration("notify", || notify_peers(&rpc, peers)); // peers are disconnected on error
if !done {
continue; // more blocks to sync
}

if config.sync_once {
return Ok(()); // exit after initial sync is done
}

break;
}

select! {
// Start an asynchronous scan of the mempool if we aren't already doing so.
// You'd expect this might cause unneeded allocations copying mempool TXIDs, but
// `rpc.mempool_txids()` is only evaluated if the channel send is doesn't block.
send(mempool_run_tx, rpc.mempool_txids()) -> res => {
match res {
Ok(_) => (),
Err(_) => warn!("disconnected from mempool scan thread"),
};
}

// Received an update to the mempool state. Apply it.
recv(mempool_update_rx) -> res => {
match res {
Ok(sync_update) => rpc.mempool_apply(sync_update),
Err(_) => warn!("disconnected from mempool scan thread"),
};
}

// Mempool scanning in progress, or `ignore_mempool` is true.
default => {}
};

duration.observe_duration("select", || -> Result<()> {
select! {
// Handle signals for graceful shutdown
Expand Down Expand Up @@ -247,3 +290,24 @@ fn recv_loop(peer_id: usize, stream: &TcpStream, server_tx: Sender<Event>) -> Re
server_tx.send(Event { peer_id, msg })?;
Ok(())
}

fn mempool_sync_loop(
daemon: Arc<Daemon>,
mempool_update_tx: Sender<MempoolSyncUpdate>,
mempool_run_rx: Receiver<HashSet<Txid>>,
exit_flag: ExitFlag,
) -> Result<()> {
while let Ok(old_txids) = mempool_run_rx.recv() {
match MempoolSyncUpdate::poll(&daemon, old_txids, &exit_flag) {
Ok(sync_update) => {
let _ = mempool_update_tx.send(sync_update);
}
Err(e) => {
warn!("mempool sync failed: {}", e);
continue;
}
};
}
debug!("mempool sync thread exiting");
Ok(())
}
16 changes: 3 additions & 13 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ use crate::{
daemon::Daemon,
db::DBStore,
index::Index,
mempool::{FeeHistogram, Mempool},
mempool::Mempool,
metrics::Metrics,
signals::ExitFlag,
status::{Balance, ScriptHashStatus, UnspentEntry},
};

/// Electrum protocol subscriptions' tracker
pub struct Tracker {
pub(crate) mempool: Mempool,
index: Index,
mempool: Mempool,
metrics: Metrics,
ignore_mempool: bool,
}

pub(crate) enum Error {
Expand All @@ -51,18 +50,13 @@ impl Tracker {
.context("failed to open index")?,
mempool: Mempool::new(&metrics),
metrics,
ignore_mempool: config.ignore_mempool,
})
}

pub(crate) fn chain(&self) -> &Chain {
self.index.chain()
}

pub(crate) fn fees_histogram(&self) -> &FeeHistogram {
self.mempool.fees_histogram()
}

pub(crate) fn metrics(&self) -> &Metrics {
&self.metrics
}
Expand All @@ -71,12 +65,8 @@ impl Tracker {
status.get_unspent(self.index.chain())
}

pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<bool> {
pub(crate) fn sync_chain(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<bool> {
let done = self.index.sync(daemon, exit_flag)?;
if done && !self.ignore_mempool {
self.mempool.sync(daemon, exit_flag);
// TODO: double check tip - and retry on diff
}
Ok(done)
}

Expand Down

0 comments on commit cd4c279

Please sign in to comment.