diff --git a/src/daemon.rs b/src/daemon.rs index f39c2ce9b..da85a28c3 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -220,6 +220,12 @@ impl Daemon { .context("failed to get mempool txids") } + pub(crate) fn get_mempool_entry(&self, txid: &Txid) -> Result { + self.rpc + .get_mempool_entry(txid) + .context("failed to get mempool entry") + } + pub(crate) fn get_mempool_entries( &self, txids: &[Txid], diff --git a/src/electrum.rs b/src/electrum.rs index 46e0fa968..4071b27ec 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -9,15 +9,17 @@ use rayon::prelude::*; use serde_derive::Deserialize; use serde_json::{self, json, Value}; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::fmt; use std::iter::FromIterator; use std::str::FromStr; +use std::sync::Arc; use crate::{ cache::Cache, config::{Config, ELECTRS_VERSION}, daemon::{self, extract_bitcoind_error, Daemon}, + mempool::{self, MempoolSyncUpdate}, merkle::Proof, metrics::{self, Histogram, Metrics}, signals::Signal, @@ -47,6 +49,20 @@ struct Request { params: Value, } +struct CallResult { + response: Value, + mempool_update: MempoolSyncUpdate, +} + +impl CallResult { + fn new(response: T) -> CallResult { + CallResult { + response: json!(response), + mempool_update: MempoolSyncUpdate::default(), + } + } +} + #[derive(Deserialize)] #[serde(untagged)] enum Requests { @@ -123,7 +139,7 @@ pub struct Rpc { tracker: Tracker, cache: Cache, rpc_duration: Histogram, - daemon: Daemon, + daemon: Arc, signal: Signal, banner: String, port: u16, @@ -147,7 +163,7 @@ impl Rpc { tracker, cache, rpc_duration, - daemon, + daemon: Arc::new(daemon), signal, banner: config.server_banner.clone(), port: config.electrum_rpc_addr.port(), @@ -158,12 +174,25 @@ impl Rpc { &self.signal } + pub(crate) fn daemon(&self) -> &Arc { + &self.daemon + } + pub fn new_block_notification(&self) -> Receiver<()> { self.daemon.new_block_notification() } - pub fn sync(&mut self) -> Result { - self.tracker.sync(&self.daemon, self.signal.exit_flag()) + pub fn sync_chain(&mut self) -> Result { + self.tracker + .sync_chain(&self.daemon, self.signal.exit_flag()) + } + + pub(crate) fn mempool_txids(&self) -> HashSet { + self.tracker.mempool.all_txids() + } + + pub(crate) fn mempool_apply(&mut self, sync_update: MempoolSyncUpdate) { + self.tracker.mempool.apply_sync_update(sync_update) } pub fn update_client(&self, client: &mut Client) -> Result> { @@ -357,11 +386,20 @@ impl Rpc { Ok(status) } - fn transaction_broadcast(&self, (tx_hex,): &(String,)) -> Result { + fn transaction_broadcast(&self, (tx_hex,): &(String,)) -> Result<(Value, MempoolSyncUpdate)> { let tx_bytes = Vec::from_hex(tx_hex).context("non-hex transaction")?; let tx = deserialize(&tx_bytes).context("invalid transaction")?; let txid = self.daemon.broadcast(&tx)?; - Ok(json!(txid)) + + // Try to fetch the mempool entry immediately, so we can return an update + // to be applied to the mempool. + let mut mempool_update = MempoolSyncUpdate::default(); + if let Ok(rpc_entry) = self.daemon.get_mempool_entry(&txid) { + let entry = mempool::Entry::new(txid, tx, rpc_entry); + mempool_update.new_entries.push(entry); + } + + Ok((json!(txid), mempool_update)) } fn transaction_get(&self, args: &TxGetArgs) -> Result { @@ -432,7 +470,7 @@ impl Rpc { } fn get_fee_histogram(&self) -> Result { - Ok(json!(self.tracker.fees_histogram())) + Ok(json!(self.tracker.mempool.fees_histogram())) } fn server_id(&self) -> String { @@ -460,7 +498,7 @@ impl Rpc { })) } - pub fn handle_requests(&self, client: &mut Client, lines: &[String]) -> Vec { + pub fn handle_requests(&mut self, client: &mut Client, lines: &[String]) -> Vec { lines .iter() .map(|line| { @@ -472,7 +510,7 @@ impl Rpc { .collect() } - fn handle_calls(&self, client: &mut Client, calls: Result) -> Value { + fn handle_calls(&mut self, client: &mut Client, calls: Result) -> Value { let calls: Calls = match calls { Ok(calls) => calls, Err(response) => return response, // JSON parsing failed - the response does not contain request id @@ -483,12 +521,33 @@ impl Rpc { if let Some(result) = self.try_multi_call(client, &batch) { return json!(result); } - json!(batch + let responses = batch .into_iter() - .map(|result| self.single_call(client, result)) - .collect::>()) + .map(|call| { + let CallResult { + response, + mempool_update, + } = self.single_call(client, call); + + // Apply the mempool update immediately, so that the next + // response will reflect the updated mempool state. + self.mempool_apply(mempool_update); + + response + }) + .collect::>(); + json!(responses) + } + + Calls::Single(call) => { + let CallResult { + response, + mempool_update, + } = self.single_call(client, call); + + self.mempool_apply(mempool_update); + response } - Calls::Single(result) => self.single_call(client, result), } } @@ -523,10 +582,10 @@ impl Rpc { ) } - fn single_call(&self, client: &mut Client, call: Result) -> Value { + fn single_call(&self, client: &mut Client, call: Result) -> CallResult { let call = match call { Ok(call) => call, - Err(response) => return response, // params parsing may fail - the response contains request id + Err(response) => return CallResult::new(response), // params parsing may fail - the response contains request id }; self.rpc_duration.observe_duration(&call.method, || { if self.tracker.status().is_err() { @@ -536,9 +595,11 @@ impl Rpc { | Params::BlockHeaders(_) | Params::HeadersSubscribe | Params::Version(_) => (), - _ => return error_msg(&call.id, RpcError::UnavailableIndex), + _ => return CallResult::new(error_msg(&call.id, RpcError::UnavailableIndex)), }; } + + let mut mempool_update = MempoolSyncUpdate::default(); let result = match &call.params { Params::Banner => Ok(json!(self.banner)), Params::BlockHeader(args) => self.block_header(*args), @@ -556,13 +617,23 @@ impl Rpc { Params::ScriptHashListUnspent(args) => self.scripthash_list_unspent(client, args), Params::ScriptHashSubscribe(args) => self.scripthash_subscribe(client, args), Params::ScriptHashUnsubscribe(args) => self.scripthash_unsubscribe(client, args), - Params::TransactionBroadcast(args) => self.transaction_broadcast(args), + Params::TransactionBroadcast(args) => { + self.transaction_broadcast(args) + .map(|(result, sync_update)| { + mempool_update = sync_update; // extract the mempool sync update + result + }) + } Params::TransactionGet(args) => self.transaction_get(args), Params::TransactionGetMerkle(args) => self.transaction_get_merkle(args), Params::TransactionFromPosition(args) => self.transaction_from_pos(*args), Params::Version(args) => self.version(args), }; - call.response(result) + + CallResult { + response: call.response(result), + mempool_update, + } }) } } diff --git a/src/mempool.rs b/src/mempool.rs index 1108a8142..15ac50eb6 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -24,6 +24,22 @@ pub(crate) struct Entry { pub has_unconfirmed_inputs: bool, } +impl Entry { + pub fn new( + txid: Txid, + tx: Transaction, + rpc_entry: bitcoincore_rpc::json::GetMempoolEntryResult, + ) -> Entry { + Entry { + txid, + tx, + vsize: rpc_entry.vsize, + fee: rpc_entry.fees.base, + has_unconfirmed_inputs: !rpc_entry.depends.is_empty(), + } + } +} + /// Mempool current state pub(crate) struct Mempool { entries: HashMap, @@ -38,8 +54,9 @@ pub(crate) struct Mempool { /// An update to [`Mempool`]'s internal state. This can be fetched /// asynchronously using [`MempoolSyncUpdate::poll`], and applied /// using [`Mempool::apply_sync_update`]. +#[derive(Default)] pub(crate) struct MempoolSyncUpdate { - new_entries: Vec, + pub(crate) new_entries: Vec, removed_entries: HashSet, } @@ -139,6 +156,10 @@ impl Mempool { &self.fees } + pub(crate) fn all_txids(&self) -> HashSet { + HashSet::::from_iter(self.entries.keys().copied()) + } + pub(crate) fn get(&self, txid: &Txid) -> Option<&Entry> { self.entries.get(txid) } @@ -170,6 +191,11 @@ impl Mempool { let removed = update.removed_entries.len(); let added = update.new_entries.len(); + // Return early to avoid spurious logs. + if added == 0 && removed == 0 { + return; + } + for txid_to_remove in update.removed_entries { self.remove_entry(txid_to_remove); } @@ -198,22 +224,6 @@ impl Mempool { } } - pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) { - let old_txids = HashSet::::from_iter(self.entries.keys().copied()); - - let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag); - - let sync_update = match poll_result { - Ok(sync_update) => sync_update, - Err(e) => { - warn!("mempool sync failed: {}", e); - return; - } - }; - - self.apply_sync_update(sync_update); - } - /// Add a transaction entry to the mempool and update the fee histogram. fn add_entry(&mut self, entry: Entry) { for txi in &entry.tx.input { diff --git a/src/server.rs b/src/server.rs index f50662f70..cda0c6207 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,19 +1,23 @@ use anyhow::{Context, Result}; -use crossbeam_channel::{select, unbounded, Sender}; +use bitcoin::Txid; +use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender}; use rayon::prelude::*; use std::{ - collections::hash_map::HashMap, + collections::{HashMap, HashSet}, io::{BufRead, BufReader, Write}, iter::once, net::{Shutdown, TcpListener, TcpStream}, + sync::Arc, }; use crate::{ config::Config, + daemon::Daemon, electrum::{Client, Rpc}, + mempool::MempoolSyncUpdate, metrics::{self, Metrics}, - signals::ExitError, + signals::{ExitError, ExitFlag}, thread::spawn, }; @@ -87,19 +91,58 @@ fn serve() -> Result<()> { let new_block_rx = rpc.new_block_notification(); let mut peers = HashMap::::new(); + + let (mempool_update_tx, mempool_update_rx) = bounded(0); + let (mempool_run_tx, mempool_run_rx) = bounded(0); + + let daemon = Arc::clone(rpc.daemon()); + let exit_flag = rpc.signal().exit_flag().clone(); + if !config.ignore_mempool { + spawn("mempool_sync", move || { + mempool_sync_loop(daemon, mempool_update_tx, mempool_run_rx, exit_flag) + }); + } + loop { // initial sync and compaction may take a few hours while server_rx.is_empty() { - let done = duration.observe_duration("sync", || rpc.sync().context("sync failed"))?; // sync a batch of blocks + let done = + duration.observe_duration("sync", || rpc.sync_chain().context("sync failed"))?; // sync a batch of blocks peers = duration.observe_duration("notify", || notify_peers(&rpc, peers)); // peers are disconnected on error if !done { continue; // more blocks to sync } + if config.sync_once { return Ok(()); // exit after initial sync is done } + break; } + + select! { + // Start an asynchronous scan of the mempool if we aren't already doing so. + // You'd expect this might cause unneeded allocations copying mempool TXIDs, but + // `rpc.mempool_txids()` is only evaluated if the channel send is doesn't block. + send(mempool_run_tx, rpc.mempool_txids()) -> res => { + match res { + Ok(_) => (), + Err(_) => warn!("disconnected from mempool scan thread"), + }; + } + + // Received an update to the mempool state. Apply it. + recv(mempool_update_rx) -> res => { + match res { + Ok(sync_update) => rpc.mempool_apply(sync_update), + Err(_) => warn!("disconnected from mempool scan thread"), + }; + } + + // Mempool scanning in progress, or `ignore_mempool` is true. + default => {} + }; + duration.observe_duration("select", || -> Result<()> { select! { // Handle signals for graceful shutdown @@ -121,7 +164,7 @@ fn serve() -> Result<()> { let rest = server_rx.iter().take(server_rx.len()); let events: Vec = first.chain(rest).collect(); server_batch_size.observe("recv", events.len() as f64); - duration.observe_duration("handle", || handle_events(&rpc, &mut peers, events)); + duration.observe_duration("handle", || handle_events(&mut rpc, &mut peers, events)); }, default(config.wait_duration) => (), // sync and update }; @@ -163,7 +206,7 @@ enum Message { Done, } -fn handle_events(rpc: &Rpc, peers: &mut HashMap, events: Vec) { +fn handle_events(rpc: &mut Rpc, peers: &mut HashMap, events: Vec) { let mut events_by_peer = HashMap::>::new(); events .into_iter() @@ -174,7 +217,7 @@ fn handle_events(rpc: &Rpc, peers: &mut HashMap, events: Vec } fn handle_peer_events( - rpc: &Rpc, + rpc: &mut Rpc, peers: &mut HashMap, peer_id: usize, messages: Vec, @@ -247,3 +290,24 @@ fn recv_loop(peer_id: usize, stream: &TcpStream, server_tx: Sender) -> Re server_tx.send(Event { peer_id, msg })?; Ok(()) } + +fn mempool_sync_loop( + daemon: Arc, + mempool_update_tx: Sender, + mempool_run_rx: Receiver>, + exit_flag: ExitFlag, +) -> Result<()> { + while let Ok(old_txids) = mempool_run_rx.recv() { + match MempoolSyncUpdate::poll(&daemon, old_txids, &exit_flag) { + Ok(sync_update) => { + let _ = mempool_update_tx.send(sync_update); + } + Err(e) => { + warn!("mempool sync failed: {}", e); + continue; + } + }; + } + debug!("mempool sync thread exiting"); + Ok(()) +} diff --git a/src/tracker.rs b/src/tracker.rs index 9ed97f355..b4736a8b1 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -13,7 +13,7 @@ use crate::{ daemon::Daemon, db::DBStore, index::Index, - mempool::{FeeHistogram, Mempool}, + mempool::Mempool, metrics::Metrics, signals::ExitFlag, status::{Balance, ScriptHashStatus, UnspentEntry}, @@ -21,10 +21,9 @@ use crate::{ /// Electrum protocol subscriptions' tracker pub struct Tracker { + pub(crate) mempool: Mempool, index: Index, - mempool: Mempool, metrics: Metrics, - ignore_mempool: bool, } pub(crate) enum Error { @@ -51,7 +50,6 @@ impl Tracker { .context("failed to open index")?, mempool: Mempool::new(&metrics), metrics, - ignore_mempool: config.ignore_mempool, }) } @@ -59,10 +57,6 @@ impl Tracker { self.index.chain() } - pub(crate) fn fees_histogram(&self) -> &FeeHistogram { - self.mempool.fees_histogram() - } - pub(crate) fn metrics(&self) -> &Metrics { &self.metrics } @@ -71,12 +65,8 @@ impl Tracker { status.get_unspent(self.index.chain()) } - pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result { + pub(crate) fn sync_chain(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result { let done = self.index.sync(daemon, exit_flag)?; - if done && !self.ignore_mempool { - self.mempool.sync(daemon, exit_flag); - // TODO: double check tip - and retry on diff - } Ok(done) }