diff --git a/src/electrum.rs b/src/electrum.rs index 46e0fa968..6e7fe7696 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -9,15 +9,17 @@ use rayon::prelude::*; use serde_derive::Deserialize; use serde_json::{self, json, Value}; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::fmt; use std::iter::FromIterator; use std::str::FromStr; +use std::sync::Arc; use crate::{ cache::Cache, config::{Config, ELECTRS_VERSION}, daemon::{self, extract_bitcoind_error, Daemon}, + mempool::MempoolSyncUpdate, merkle::Proof, metrics::{self, Histogram, Metrics}, signals::Signal, @@ -123,7 +125,7 @@ pub struct Rpc { tracker: Tracker, cache: Cache, rpc_duration: Histogram, - daemon: Daemon, + daemon: Arc, signal: Signal, banner: String, port: u16, @@ -147,7 +149,7 @@ impl Rpc { tracker, cache, rpc_duration, - daemon, + daemon: Arc::new(daemon), signal, banner: config.server_banner.clone(), port: config.electrum_rpc_addr.port(), @@ -158,12 +160,25 @@ impl Rpc { &self.signal } + pub(crate) fn daemon(&self) -> &Arc { + &self.daemon + } + pub fn new_block_notification(&self) -> Receiver<()> { self.daemon.new_block_notification() } - pub fn sync(&mut self) -> Result { - self.tracker.sync(&self.daemon, self.signal.exit_flag()) + pub fn sync_chain(&mut self) -> Result { + self.tracker + .sync_chain(&self.daemon, self.signal.exit_flag()) + } + + pub(crate) fn mempool_txids(&self) -> HashSet { + self.tracker.mempool.all_txids() + } + + pub(crate) fn mempool_apply(&mut self, sync_update: MempoolSyncUpdate) { + self.tracker.mempool.apply_sync_update(sync_update) } pub fn update_client(&self, client: &mut Client) -> Result> { @@ -432,7 +447,7 @@ impl Rpc { } fn get_fee_histogram(&self) -> Result { - Ok(json!(self.tracker.fees_histogram())) + Ok(json!(self.tracker.mempool.fees_histogram())) } fn server_id(&self) -> String { diff --git a/src/mempool.rs b/src/mempool.rs index ea3c2feda..11e59c97e 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -127,6 +127,10 @@ impl Mempool { &self.fees } + pub(crate) fn all_txids(&self) -> HashSet { + HashSet::::from_iter(self.entries.keys().copied()) + } + pub(crate) fn get(&self, txid: &Txid) -> Option<&Entry> { self.entries.get(txid) } @@ -186,22 +190,6 @@ impl Mempool { } } - pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) { - let old_txids = HashSet::::from_iter(self.entries.keys().copied()); - - let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag); - - let sync_update = match poll_result { - Ok(sync_update) => sync_update, - Err(e) => { - warn!("mempool sync failed: {}", e); - return; - } - }; - - self.apply_sync_update(sync_update); - } - /// Add a transaction entry to the mempool and update the fee histogram. fn add_entry(&mut self, entry: Entry) { for txi in &entry.tx.input { diff --git a/src/server.rs b/src/server.rs index f50662f70..008fa6b97 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,19 +1,23 @@ use anyhow::{Context, Result}; -use crossbeam_channel::{select, unbounded, Sender}; +use bitcoin::Txid; +use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender}; use rayon::prelude::*; use std::{ - collections::hash_map::HashMap, + collections::{HashMap, HashSet}, io::{BufRead, BufReader, Write}, iter::once, net::{Shutdown, TcpListener, TcpStream}, + sync::Arc, }; use crate::{ config::Config, + daemon::Daemon, electrum::{Client, Rpc}, + mempool::MempoolSyncUpdate, metrics::{self, Metrics}, - signals::ExitError, + signals::{ExitError, ExitFlag}, thread::spawn, }; @@ -87,19 +91,58 @@ fn serve() -> Result<()> { let new_block_rx = rpc.new_block_notification(); let mut peers = HashMap::::new(); + + let (mempool_update_tx, mempool_update_rx) = bounded(0); + let (mempool_run_tx, mempool_run_rx) = bounded(0); + + let daemon = Arc::clone(rpc.daemon()); + let exit_flag = rpc.signal().exit_flag().clone(); + if !config.ignore_mempool { + spawn("mempool_sync", move || { + mempool_sync_loop(daemon, mempool_update_tx, mempool_run_rx, exit_flag) + }); + } + loop { // initial sync and compaction may take a few hours while server_rx.is_empty() { - let done = duration.observe_duration("sync", || rpc.sync().context("sync failed"))?; // sync a batch of blocks + let done = + duration.observe_duration("sync", || rpc.sync_chain().context("sync failed"))?; // sync a batch of blocks peers = duration.observe_duration("notify", || notify_peers(&rpc, peers)); // peers are disconnected on error if !done { continue; // more blocks to sync } + if config.sync_once { return Ok(()); // exit after initial sync is done } + break; } + + select! { + // Start an asynchronous scan of the mempool if we aren't already doing so. + // You'd expect this might cause unneeded allocations copying mempool TXIDs, but + // `rpc.mempool_txids()` is only evaluated if the channel send is doesn't block. + send(mempool_run_tx, rpc.mempool_txids()) -> res => { + match res { + Ok(_) => (), + Err(_) => warn!("disconnected from mempool scan thread"), + }; + } + + // Received an update to the mempool state. Apply it. + recv(mempool_update_rx) -> res => { + match res { + Ok(sync_update) => rpc.mempool_apply(sync_update), + Err(_) => warn!("disconnected from mempool scan thread"), + }; + } + + // Mempool scanning in progress, or `ignore_mempool` is true. + default => {} + }; + duration.observe_duration("select", || -> Result<()> { select! { // Handle signals for graceful shutdown @@ -247,3 +290,24 @@ fn recv_loop(peer_id: usize, stream: &TcpStream, server_tx: Sender) -> Re server_tx.send(Event { peer_id, msg })?; Ok(()) } + +fn mempool_sync_loop( + daemon: Arc, + mempool_update_tx: Sender, + mempool_run_rx: Receiver>, + exit_flag: ExitFlag, +) -> Result<()> { + while let Ok(old_txids) = mempool_run_rx.recv() { + match MempoolSyncUpdate::poll(&daemon, old_txids, &exit_flag) { + Ok(sync_update) => { + let _ = mempool_update_tx.send(sync_update); + } + Err(e) => { + warn!("mempool sync failed: {}", e); + continue; + } + }; + } + debug!("mempool sync thread exiting"); + Ok(()) +} diff --git a/src/tracker.rs b/src/tracker.rs index 9ed97f355..b4736a8b1 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -13,7 +13,7 @@ use crate::{ daemon::Daemon, db::DBStore, index::Index, - mempool::{FeeHistogram, Mempool}, + mempool::Mempool, metrics::Metrics, signals::ExitFlag, status::{Balance, ScriptHashStatus, UnspentEntry}, @@ -21,10 +21,9 @@ use crate::{ /// Electrum protocol subscriptions' tracker pub struct Tracker { + pub(crate) mempool: Mempool, index: Index, - mempool: Mempool, metrics: Metrics, - ignore_mempool: bool, } pub(crate) enum Error { @@ -51,7 +50,6 @@ impl Tracker { .context("failed to open index")?, mempool: Mempool::new(&metrics), metrics, - ignore_mempool: config.ignore_mempool, }) } @@ -59,10 +57,6 @@ impl Tracker { self.index.chain() } - pub(crate) fn fees_histogram(&self) -> &FeeHistogram { - self.mempool.fees_histogram() - } - pub(crate) fn metrics(&self) -> &Metrics { &self.metrics } @@ -71,12 +65,8 @@ impl Tracker { status.get_unspent(self.index.chain()) } - pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result { + pub(crate) fn sync_chain(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result { let done = self.index.sync(daemon, exit_flag)?; - if done && !self.ignore_mempool { - self.mempool.sync(daemon, exit_flag); - // TODO: double check tip - and retry on diff - } Ok(done) }