Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async mempool scanning #970

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ impl Daemon {
.context("failed to get mempool txids")
}

pub(crate) fn get_mempool_entry(&self, txid: &Txid) -> Result<json::GetMempoolEntryResult> {
self.rpc
.get_mempool_entry(txid)
.context("failed to get mempool entry")
}

pub(crate) fn get_mempool_entries(
&self,
txids: &[Txid],
Expand Down
109 changes: 90 additions & 19 deletions src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ use rayon::prelude::*;
use serde_derive::Deserialize;
use serde_json::{self, json, Value};

use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::fmt;
use std::iter::FromIterator;
use std::str::FromStr;
use std::sync::Arc;

use crate::{
cache::Cache,
config::{Config, ELECTRS_VERSION},
daemon::{self, extract_bitcoind_error, Daemon},
mempool::{self, MempoolSyncUpdate},
merkle::Proof,
metrics::{self, Histogram, Metrics},
signals::Signal,
Expand Down Expand Up @@ -47,6 +49,20 @@ struct Request {
params: Value,
}

struct CallResult {
response: Value,
mempool_update: MempoolSyncUpdate,
}

impl CallResult {
fn new<T: serde::Serialize>(response: T) -> CallResult {
CallResult {
response: json!(response),
mempool_update: MempoolSyncUpdate::default(),
}
}
}

#[derive(Deserialize)]
#[serde(untagged)]
enum Requests {
Expand Down Expand Up @@ -123,7 +139,7 @@ pub struct Rpc {
tracker: Tracker,
cache: Cache,
rpc_duration: Histogram,
daemon: Daemon,
daemon: Arc<Daemon>,
signal: Signal,
banner: String,
port: u16,
Expand All @@ -147,7 +163,7 @@ impl Rpc {
tracker,
cache,
rpc_duration,
daemon,
daemon: Arc::new(daemon),
signal,
banner: config.server_banner.clone(),
port: config.electrum_rpc_addr.port(),
Expand All @@ -158,12 +174,25 @@ impl Rpc {
&self.signal
}

pub(crate) fn daemon(&self) -> &Arc<Daemon> {
&self.daemon
}

pub fn new_block_notification(&self) -> Receiver<()> {
self.daemon.new_block_notification()
}

pub fn sync(&mut self) -> Result<bool> {
self.tracker.sync(&self.daemon, self.signal.exit_flag())
pub fn sync_chain(&mut self) -> Result<bool> {
self.tracker
.sync_chain(&self.daemon, self.signal.exit_flag())
}

pub(crate) fn mempool_txids(&self) -> HashSet<Txid> {
self.tracker.mempool.all_txids()
}

pub(crate) fn mempool_apply(&mut self, sync_update: MempoolSyncUpdate) {
self.tracker.mempool.apply_sync_update(sync_update)
}

pub fn update_client(&self, client: &mut Client) -> Result<Vec<String>> {
Expand Down Expand Up @@ -357,11 +386,20 @@ impl Rpc {
Ok(status)
}

fn transaction_broadcast(&self, (tx_hex,): &(String,)) -> Result<Value> {
fn transaction_broadcast(&self, (tx_hex,): &(String,)) -> Result<(Value, MempoolSyncUpdate)> {
let tx_bytes = Vec::from_hex(tx_hex).context("non-hex transaction")?;
let tx = deserialize(&tx_bytes).context("invalid transaction")?;
let txid = self.daemon.broadcast(&tx)?;
Ok(json!(txid))

// Try to fetch the mempool entry immediately, so we can return an update
// to be applied to the mempool.
let mut mempool_update = MempoolSyncUpdate::default();
if let Ok(rpc_entry) = self.daemon.get_mempool_entry(&txid) {
let entry = mempool::Entry::new(txid, tx, rpc_entry);
mempool_update.new_entries.push(entry);
}

Ok((json!(txid), mempool_update))
}

fn transaction_get(&self, args: &TxGetArgs) -> Result<Value> {
Expand Down Expand Up @@ -432,7 +470,7 @@ impl Rpc {
}

fn get_fee_histogram(&self) -> Result<Value> {
Ok(json!(self.tracker.fees_histogram()))
Ok(json!(self.tracker.mempool.fees_histogram()))
}

fn server_id(&self) -> String {
Expand Down Expand Up @@ -460,7 +498,7 @@ impl Rpc {
}))
}

pub fn handle_requests(&self, client: &mut Client, lines: &[String]) -> Vec<String> {
pub fn handle_requests(&mut self, client: &mut Client, lines: &[String]) -> Vec<String> {
lines
.iter()
.map(|line| {
Expand All @@ -472,7 +510,7 @@ impl Rpc {
.collect()
}

fn handle_calls(&self, client: &mut Client, calls: Result<Calls, Value>) -> Value {
fn handle_calls(&mut self, client: &mut Client, calls: Result<Calls, Value>) -> Value {
let calls: Calls = match calls {
Ok(calls) => calls,
Err(response) => return response, // JSON parsing failed - the response does not contain request id
Expand All @@ -483,12 +521,33 @@ impl Rpc {
if let Some(result) = self.try_multi_call(client, &batch) {
return json!(result);
}
json!(batch
let responses = batch
.into_iter()
.map(|result| self.single_call(client, result))
.collect::<Vec<Value>>())
.map(|call| {
let CallResult {
response,
mempool_update,
} = self.single_call(client, call);

// Apply the mempool update immediately, so that the next
// response will reflect the updated mempool state.
self.mempool_apply(mempool_update);

response
})
.collect::<Vec<Value>>();
json!(responses)
}

Calls::Single(call) => {
let CallResult {
response,
mempool_update,
} = self.single_call(client, call);

self.mempool_apply(mempool_update);
response
}
Calls::Single(result) => self.single_call(client, result),
}
}

Expand Down Expand Up @@ -523,10 +582,10 @@ impl Rpc {
)
}

fn single_call(&self, client: &mut Client, call: Result<Call, Value>) -> Value {
fn single_call(&self, client: &mut Client, call: Result<Call, Value>) -> CallResult {
let call = match call {
Ok(call) => call,
Err(response) => return response, // params parsing may fail - the response contains request id
Err(response) => return CallResult::new(response), // params parsing may fail - the response contains request id
};
self.rpc_duration.observe_duration(&call.method, || {
if self.tracker.status().is_err() {
Expand All @@ -536,9 +595,11 @@ impl Rpc {
| Params::BlockHeaders(_)
| Params::HeadersSubscribe
| Params::Version(_) => (),
_ => return error_msg(&call.id, RpcError::UnavailableIndex),
_ => return CallResult::new(error_msg(&call.id, RpcError::UnavailableIndex)),
};
}

let mut mempool_update = MempoolSyncUpdate::default();
let result = match &call.params {
Params::Banner => Ok(json!(self.banner)),
Params::BlockHeader(args) => self.block_header(*args),
Expand All @@ -556,13 +617,23 @@ impl Rpc {
Params::ScriptHashListUnspent(args) => self.scripthash_list_unspent(client, args),
Params::ScriptHashSubscribe(args) => self.scripthash_subscribe(client, args),
Params::ScriptHashUnsubscribe(args) => self.scripthash_unsubscribe(client, args),
Params::TransactionBroadcast(args) => self.transaction_broadcast(args),
Params::TransactionBroadcast(args) => {
self.transaction_broadcast(args)
.map(|(result, sync_update)| {
mempool_update = sync_update; // extract the mempool sync update
result
})
}
Params::TransactionGet(args) => self.transaction_get(args),
Params::TransactionGetMerkle(args) => self.transaction_get_merkle(args),
Params::TransactionFromPosition(args) => self.transaction_from_pos(*args),
Params::Version(args) => self.version(args),
};
call.response(result)

CallResult {
response: call.response(result),
mempool_update,
}
})
}
}
Expand Down
44 changes: 27 additions & 17 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ pub(crate) struct Entry {
pub has_unconfirmed_inputs: bool,
}

impl Entry {
pub fn new(
txid: Txid,
tx: Transaction,
rpc_entry: bitcoincore_rpc::json::GetMempoolEntryResult,
) -> Entry {
Entry {
txid,
tx,
vsize: rpc_entry.vsize,
fee: rpc_entry.fees.base,
has_unconfirmed_inputs: !rpc_entry.depends.is_empty(),
}
}
}

/// Mempool current state
pub(crate) struct Mempool {
entries: HashMap<Txid, Entry>,
Expand All @@ -38,8 +54,9 @@ pub(crate) struct Mempool {
/// An update to [`Mempool`]'s internal state. This can be fetched
/// asynchronously using [`MempoolSyncUpdate::poll`], and applied
/// using [`Mempool::apply_sync_update`].
#[derive(Default)]
pub(crate) struct MempoolSyncUpdate {
new_entries: Vec<Entry>,
pub(crate) new_entries: Vec<Entry>,
removed_entries: HashSet<Txid>,
}

Expand Down Expand Up @@ -139,6 +156,10 @@ impl Mempool {
&self.fees
}

pub(crate) fn all_txids(&self) -> HashSet<Txid> {
HashSet::<Txid>::from_iter(self.entries.keys().copied())
}

pub(crate) fn get(&self, txid: &Txid) -> Option<&Entry> {
self.entries.get(txid)
}
Expand Down Expand Up @@ -170,6 +191,11 @@ impl Mempool {
let removed = update.removed_entries.len();
let added = update.new_entries.len();

// Return early to avoid spurious logs.
if added == 0 && removed == 0 {
return;
}

for txid_to_remove in update.removed_entries {
self.remove_entry(txid_to_remove);
}
Expand Down Expand Up @@ -198,22 +224,6 @@ impl Mempool {
}
}

pub fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) {
let old_txids = HashSet::<Txid>::from_iter(self.entries.keys().copied());

let poll_result = MempoolSyncUpdate::poll(daemon, old_txids, exit_flag);

let sync_update = match poll_result {
Ok(sync_update) => sync_update,
Err(e) => {
warn!("mempool sync failed: {}", e);
return;
}
};

self.apply_sync_update(sync_update);
}

/// Add a transaction entry to the mempool and update the fee histogram.
fn add_entry(&mut self, entry: Entry) {
for txi in &entry.tx.input {
Expand Down
Loading