From 5eb41f052a83453add9d449a4be4e7b8b5325db9 Mon Sep 17 00:00:00 2001 From: conduition Date: Sun, 17 Dec 2023 22:32:15 +0000 Subject: [PATCH] immediately fetch TX from mempool after broadcast --- src/electrum.rs | 84 ++++++++++++++++++++++++++++++++++++++++--------- src/mempool.rs | 19 ++++++++++- src/server.rs | 6 ++-- 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/src/electrum.rs b/src/electrum.rs index 6e7fe7696..4071b27ec 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -19,7 +19,7 @@ use crate::{ cache::Cache, config::{Config, ELECTRS_VERSION}, daemon::{self, extract_bitcoind_error, Daemon}, - mempool::MempoolSyncUpdate, + mempool::{self, MempoolSyncUpdate}, merkle::Proof, metrics::{self, Histogram, Metrics}, signals::Signal, @@ -49,6 +49,20 @@ struct Request { params: Value, } +struct CallResult { + response: Value, + mempool_update: MempoolSyncUpdate, +} + +impl CallResult { + fn new(response: T) -> CallResult { + CallResult { + response: json!(response), + mempool_update: MempoolSyncUpdate::default(), + } + } +} + #[derive(Deserialize)] #[serde(untagged)] enum Requests { @@ -372,11 +386,20 @@ impl Rpc { Ok(status) } - fn transaction_broadcast(&self, (tx_hex,): &(String,)) -> Result { + fn transaction_broadcast(&self, (tx_hex,): &(String,)) -> Result<(Value, MempoolSyncUpdate)> { let tx_bytes = Vec::from_hex(tx_hex).context("non-hex transaction")?; let tx = deserialize(&tx_bytes).context("invalid transaction")?; let txid = self.daemon.broadcast(&tx)?; - Ok(json!(txid)) + + // Try to fetch the mempool entry immediately, so we can return an update + // to be applied to the mempool. + let mut mempool_update = MempoolSyncUpdate::default(); + if let Ok(rpc_entry) = self.daemon.get_mempool_entry(&txid) { + let entry = mempool::Entry::new(txid, tx, rpc_entry); + mempool_update.new_entries.push(entry); + } + + Ok((json!(txid), mempool_update)) } fn transaction_get(&self, args: &TxGetArgs) -> Result { @@ -475,7 +498,7 @@ impl Rpc { })) } - pub fn handle_requests(&self, client: &mut Client, lines: &[String]) -> Vec { + pub fn handle_requests(&mut self, client: &mut Client, lines: &[String]) -> Vec { lines .iter() .map(|line| { @@ -487,7 +510,7 @@ impl Rpc { .collect() } - fn handle_calls(&self, client: &mut Client, calls: Result) -> Value { + fn handle_calls(&mut self, client: &mut Client, calls: Result) -> Value { let calls: Calls = match calls { Ok(calls) => calls, Err(response) => return response, // JSON parsing failed - the response does not contain request id @@ -498,12 +521,33 @@ impl Rpc { if let Some(result) = self.try_multi_call(client, &batch) { return json!(result); } - json!(batch + let responses = batch .into_iter() - .map(|result| self.single_call(client, result)) - .collect::>()) + .map(|call| { + let CallResult { + response, + mempool_update, + } = self.single_call(client, call); + + // Apply the mempool update immediately, so that the next + // response will reflect the updated mempool state. + self.mempool_apply(mempool_update); + + response + }) + .collect::>(); + json!(responses) + } + + Calls::Single(call) => { + let CallResult { + response, + mempool_update, + } = self.single_call(client, call); + + self.mempool_apply(mempool_update); + response } - Calls::Single(result) => self.single_call(client, result), } } @@ -538,10 +582,10 @@ impl Rpc { ) } - fn single_call(&self, client: &mut Client, call: Result) -> Value { + fn single_call(&self, client: &mut Client, call: Result) -> CallResult { let call = match call { Ok(call) => call, - Err(response) => return response, // params parsing may fail - the response contains request id + Err(response) => return CallResult::new(response), // params parsing may fail - the response contains request id }; self.rpc_duration.observe_duration(&call.method, || { if self.tracker.status().is_err() { @@ -551,9 +595,11 @@ impl Rpc { | Params::BlockHeaders(_) | Params::HeadersSubscribe | Params::Version(_) => (), - _ => return error_msg(&call.id, RpcError::UnavailableIndex), + _ => return CallResult::new(error_msg(&call.id, RpcError::UnavailableIndex)), }; } + + let mut mempool_update = MempoolSyncUpdate::default(); let result = match &call.params { Params::Banner => Ok(json!(self.banner)), Params::BlockHeader(args) => self.block_header(*args), @@ -571,13 +617,23 @@ impl Rpc { Params::ScriptHashListUnspent(args) => self.scripthash_list_unspent(client, args), Params::ScriptHashSubscribe(args) => self.scripthash_subscribe(client, args), Params::ScriptHashUnsubscribe(args) => self.scripthash_unsubscribe(client, args), - Params::TransactionBroadcast(args) => self.transaction_broadcast(args), + Params::TransactionBroadcast(args) => { + self.transaction_broadcast(args) + .map(|(result, sync_update)| { + mempool_update = sync_update; // extract the mempool sync update + result + }) + } Params::TransactionGet(args) => self.transaction_get(args), Params::TransactionGetMerkle(args) => self.transaction_get_merkle(args), Params::TransactionFromPosition(args) => self.transaction_from_pos(*args), Params::Version(args) => self.version(args), }; - call.response(result) + + CallResult { + response: call.response(result), + mempool_update, + } }) } } diff --git a/src/mempool.rs b/src/mempool.rs index 2c6ff48d4..e75ebd6c0 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -25,6 +25,22 @@ pub(crate) struct Entry { pub has_unconfirmed_inputs: bool, } +impl Entry { + pub fn new( + txid: Txid, + tx: Transaction, + rpc_entry: bitcoincore_rpc::json::GetMempoolEntryResult, + ) -> Entry { + Entry { + txid, + tx, + vsize: rpc_entry.vsize, + fee: rpc_entry.fees.base, + has_unconfirmed_inputs: !rpc_entry.depends.is_empty(), + } + } +} + /// Mempool current state pub(crate) struct Mempool { entries: HashMap, @@ -39,8 +55,9 @@ pub(crate) struct Mempool { /// An update to [`Mempool`]'s internal state. This can be fetched /// asynchronously using [`MempoolSyncUpdate::poll`], and applied /// using [`Mempool::apply_sync_update`]. +#[derive(Default)] pub(crate) struct MempoolSyncUpdate { - new_entries: Vec, + pub(crate) new_entries: Vec, removed_entries: HashSet, } diff --git a/src/server.rs b/src/server.rs index 008fa6b97..cda0c6207 100644 --- a/src/server.rs +++ b/src/server.rs @@ -164,7 +164,7 @@ fn serve() -> Result<()> { let rest = server_rx.iter().take(server_rx.len()); let events: Vec = first.chain(rest).collect(); server_batch_size.observe("recv", events.len() as f64); - duration.observe_duration("handle", || handle_events(&rpc, &mut peers, events)); + duration.observe_duration("handle", || handle_events(&mut rpc, &mut peers, events)); }, default(config.wait_duration) => (), // sync and update }; @@ -206,7 +206,7 @@ enum Message { Done, } -fn handle_events(rpc: &Rpc, peers: &mut HashMap, events: Vec) { +fn handle_events(rpc: &mut Rpc, peers: &mut HashMap, events: Vec) { let mut events_by_peer = HashMap::>::new(); events .into_iter() @@ -217,7 +217,7 @@ fn handle_events(rpc: &Rpc, peers: &mut HashMap, events: Vec } fn handle_peer_events( - rpc: &Rpc, + rpc: &mut Rpc, peers: &mut HashMap, peer_id: usize, messages: Vec,