Skip to content

Commit

Permalink
fix(storagext): allow extrinsics to be submitted in parallel (#643)
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder authored Dec 18, 2024
1 parent f85fded commit 4d89958
Showing 1 changed file with 70 additions and 21 deletions.
91 changes: 70 additions & 21 deletions storagext/lib/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ use std::time::Duration;
use codec::Encode;
use hex::ToHex;
use subxt::{
backend::rpc::reconnecting_rpc_client::{FixedInterval, RpcClient},
backend::{
legacy::LegacyRpcMethods,
rpc::reconnecting_rpc_client::{FixedInterval, RpcClient},
},
blocks::Block,
config::DefaultExtrinsicParamsBuilder,
events::Events,
utils::H256,
OnlineClient,
};
use tokio::sync::Mutex;

use crate::PolkaStorageConfig;

Expand All @@ -32,6 +37,10 @@ where
/// You can call any extrinsic via [`Client::traced_submission`].
pub struct Client {
pub(crate) client: OnlineClient<PolkaStorageConfig>,
pub(crate) legacy_rpc: LegacyRpcMethods<PolkaStorageConfig>,
/// We're not using AtomicU64 as we need to hold the critical sections across many instructions.
/// Look at [`Self::traced_submission`].
last_sent_nonce: Mutex<u64>,
}

impl Client {
Expand All @@ -54,7 +63,9 @@ impl Client {
.map_err(|e| subxt::error::RpcError::ClientError(Box::new(e)))?;

Ok(Self {
client: OnlineClient::<_>::from_rpc_client(rpc_client).await?,
client: OnlineClient::<_>::from_rpc_client(rpc_client.clone()).await?,
legacy_rpc: LegacyRpcMethods::<_>::new(rpc_client.into()),
last_sent_nonce: Mutex::new(0),
})
}

Expand Down Expand Up @@ -85,9 +96,35 @@ impl Client {
}

/// Submit an extrinsic and wait for finalization, returning the block hash it was included in.
/// It is thread-safe, allows to submit multiple extrinsics at the same time.
/// If another process is submitting the transactions at the same time, the retry mechanism at the higher layer is needed.
///
/// Equivalent to performing [`OnlineClient::sign_and_submit_then_watch_default`],
/// followed by [`TxInBlock::wait_for_finalized`] and [`TxInBlock::wait_for_success`].
///
/// ## Nonce mechanism
///
/// ### Context
/// Each transaction sent to the blockchain must have a nonce. Nonces are incremented sequentially and cannot have gaps.
/// If you submit a transaction with the same nonce, one of them will fail or be replaced. Dependent on the priority (transaction size).
///
/// ### Solution
///
/// The current solution for this is optimistic. It is fetching the nonce using `system_account_next_index` from the **best block** and using it as a nonce.
/// Returned index is taking into the account transactions already included in the blocks and the ones pending (in the transaction pool).
/// To avoid the race condition between the tasks in the same process a critical section is introduced.
/// It locks the extrinsic submission, so the next task is allowed to fetch the next index only after the previous has been submitted (txpool updated).
///
/// 1. We assume we connect to the same node for each transaction performed, if we didn't, then the possibility of nonce collisions would be more frequent.
/// 2. When we `.submit()` a transaction and it fails, the nonce is not updated, so next time we call `system_account_next_index`, it'll return the same nonce.
/// 3. When we `.submit()` a transaction and it succeeds, the nonce is updated, next returned nonce will be incremented.
/// 4. If any other process submits the transaction, after we fetch the current_nonce, this call will:
/// a) fail (transaction outdated)
/// b) fail (will be replaced by the other process transaction)
/// c) succeed (replace the other process transaction)
/// 5. Because of the 1. and 4., the retry mechanism would be needed and the error is detectable:
/// a) at the `.submit()` level, when nonce < chain_nonce OR nonce == chain_nonce && tx1_priority < tx2_priority.
/// b) only after waiting for finalization and not getting the event (TimeoutError).
pub(crate) async fn traced_submission<Call, Keypair>(
&self,
call: &Call,
Expand All @@ -98,26 +135,44 @@ impl Client {
Call: subxt::tx::Payload,
Keypair: subxt::tx::Signer<PolkaStorageConfig>,
{
if wait_for_finalization {
// Critical section
let submitted_extrinsic_hash = {
let mut last_sent_nonce = self.last_sent_nonce.lock().await;
let current_nonce = self
.legacy_rpc
.system_account_next_index(&account_keypair.account_id())
.await?;
let current_header = self.legacy_rpc.chain_get_header(None).await?.unwrap();
let ext_params = DefaultExtrinsicParamsBuilder::new()
.mortal(&current_header, 8)
.nonce(current_nonce)
.build();

let submitted_extrinsic_hash = self
.client
.tx()
.sign_and_submit_default(call, account_keypair)
.create_signed_offline(call, account_keypair, ext_params)?
.submit()
.await?;

tracing::debug!(
"Previous nonce: {}, next nonce: {}",
last_sent_nonce,
current_nonce
);
*last_sent_nonce = current_nonce;

submitted_extrinsic_hash
};

if wait_for_finalization {
self.traced_submission_with_finalization(submitted_extrinsic_hash)
.await
.map(Option::Some)
} else {
tracing::trace!("submitting extrinsic");
let extrinsic_hash = self
.client
.tx()
.sign_and_submit_default(call, account_keypair)
.await?;

tracing::trace!(
extrinsic_hash = extrinsic_hash.encode_hex::<String>(),
"waiting for finalization"
extrinsic_hash = submitted_extrinsic_hash.encode_hex::<String>(),
"extrinsic published, not waiting for the finalization"
);
Ok(None)
}
Expand Down Expand Up @@ -186,9 +241,9 @@ impl Client {
}
});

// 1 block = 6 seconds -> 120 seconds = 20 blocks
// 1 block = 6 seconds -> 60 seconds = 10 blocks
// since the subscription has like a ~6 block delay
let timeout = tokio::time::timeout(Duration::from_secs(120), finalized_block).await;
let timeout = tokio::time::timeout(Duration::from_secs(60), finalized_block).await;

match timeout {
Ok(Ok(result)) => {
Expand All @@ -206,9 +261,3 @@ impl Client {
}
}
}

impl From<OnlineClient<PolkaStorageConfig>> for Client {
fn from(client: OnlineClient<PolkaStorageConfig>) -> Self {
Self { client }
}
}

0 comments on commit 4d89958

Please sign in to comment.