Skip to content

Commit

Permalink
remove QD conflicting tx retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed Dec 18, 2024
1 parent 53dc802 commit 5891ae8
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 419 deletions.
122 changes: 27 additions & 95 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,6 @@ pub enum AggregatorProcessTransactionError {
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
},

#[error(
"Validators returned conflicting transactions but it is potentially recoverable. Locked objects: {:?}. Validator errors: {:?}",
conflicting_tx_digests,
errors,
)]
RetryableConflictingTransaction {
conflicting_tx_digest_to_retry: Option<TransactionDigest>,
errors: GroupedErrors,
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
},

#[error(
"{} of the validators by stake are overloaded with transactions pending execution. Validator errors: {:?}",
overloaded_stake,
Expand Down Expand Up @@ -356,44 +344,24 @@ struct ProcessTransactionState {
overloaded_stake: StakeUnit,
// Validators that are overloaded and request client to retry.
retryable_overload_info: RetryableOverloadInfo,
// If there are conflicting transactions, we note them down and may attempt to retry
// If there are conflicting transactions, we note them down to report to user.
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
// As long as none of the exit criteria are met we consider the state retryable
// 1) >= 2f+1 signatures
// 2) >= f+1 non-retryable errors
// 3) >= 2f+1 object not found errors
// Note: For conflicting transactions we collect as many responses as possible
// before we know for sure no tx can reach quorum. Namely, stake of the most
// promising tx + retryable stake < 2f+1.
retryable: bool,
tx_finalized_with_different_user_sig: bool,

conflicting_tx_total_stake: StakeUnit,
most_staked_conflicting_tx_stake: StakeUnit,
}

impl ProcessTransactionState {
#[allow(clippy::type_complexity)]
pub fn conflicting_tx_digest_with_most_stake(
&self,
) -> Option<(
TransactionDigest,
&Vec<(AuthorityName, ObjectRef)>,
StakeUnit,
)> {
self.conflicting_tx_digests
.iter()
.max_by_key(|(_, (_, stake))| *stake)
.map(|(digest, (validators, stake))| (*digest, validators, *stake))
}

pub fn record_conflicting_transaction_if_any(
&mut self,
validator_name: AuthorityName,
weight: StakeUnit,
err: &SuiError,
) -> bool {
) {
if let SuiError::ObjectLockConflict {
obj_ref,
pending_transaction: transaction,
Expand All @@ -405,13 +373,7 @@ impl ProcessTransactionState {
.or_insert((Vec::new(), 0));
lock_records.push((validator_name, *obj_ref));
*total_stake += weight;
self.conflicting_tx_total_stake += weight;
if *total_stake > self.most_staked_conflicting_tx_stake {
self.most_staked_conflicting_tx_stake = *total_stake;
}
return true;
}
false
}

pub fn check_if_error_indicates_tx_finalized_with_different_user_sig(
Expand Down Expand Up @@ -1048,9 +1010,7 @@ where
retryable_overload_info: Default::default(),
retryable: true,
conflicting_tx_digests: Default::default(),
conflicting_tx_total_stake: 0,
tx_finalized_with_different_user_sig: false,
most_staked_conflicting_tx_stake: 0,
};

let transaction_ref = &transaction;
Expand Down Expand Up @@ -1092,6 +1052,8 @@ where
.with_label_values(&[&display_name, err.as_ref()])
.inc();
Self::record_rpc_error_maybe(self.metrics.clone(), &display_name, &err);
// Record conflicting transactions if any to report to user.
state.record_conflicting_transaction_if_any(name, weight, &err);
let (retryable, categorized) = err.is_retryable();
if !categorized {
// TODO: Should minimize possible uncategorized errors here
Expand Down Expand Up @@ -1124,9 +1086,7 @@ where
// code path to handle both overload scenarios.
state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
// because its handling is a bit different.
else if !retryable {
state.non_retryable_stake += weight;
}
state.errors.push((err, vec![name], weight));
Expand All @@ -1136,12 +1096,10 @@ where

let retryable_stake = self.get_retryable_stake(&state);
let good_stake = std::cmp::max(state.tx_signatures.total_votes(), state.effects_map.total_votes());
let stake_of_most_promising_tx = std::cmp::max(good_stake, state.most_staked_conflicting_tx_stake);
if stake_of_most_promising_tx + retryable_stake < quorum_threshold {
if good_stake + retryable_stake < quorum_threshold {
debug!(
tx_digest = ?tx_digest,
good_stake,
most_staked_conflicting_tx_stake =? state.most_staked_conflicting_tx_stake,
retryable_stake,
"No chance for any tx to get quorum, exiting. Conflicting_txes: {:?}",
state.conflicting_tx_digests
Expand Down Expand Up @@ -1174,7 +1132,7 @@ where
Err(state) => {
self.record_process_transaction_metrics(tx_digest, &state);
let state = self.record_non_quorum_effects_maybe(tx_digest, state);
Err(self.handle_process_transaction_error(tx_digest, state))
Err(self.handle_process_transaction_error(state))
}
}
}
Expand All @@ -1194,7 +1152,6 @@ where

fn handle_process_transaction_error(
&self,
original_tx_digest: &TransactionDigest,
state: ProcessTransactionState,
) -> AggregatorProcessTransactionError {
let quorum_threshold = self.committee.quorum_threshold();
Expand All @@ -1207,49 +1164,6 @@ where
};
}

// Handle possible conflicts first as `FatalConflictingTransaction` is
// more meaningful than `FatalTransaction`.
if let Some((most_staked_conflicting_tx, validators, most_staked_conflicting_tx_stake)) =
state.conflicting_tx_digest_with_most_stake()
{
let good_stake = state.tx_signatures.total_votes();
let retryable_stake = self.get_retryable_stake(&state);

if good_stake + retryable_stake >= quorum_threshold {
return AggregatorProcessTransactionError::RetryableConflictingTransaction {
errors: group_errors(state.errors),
conflicting_tx_digest_to_retry: None,
conflicting_tx_digests: state.conflicting_tx_digests,
};
}

if most_staked_conflicting_tx_stake + retryable_stake >= quorum_threshold {
return AggregatorProcessTransactionError::RetryableConflictingTransaction {
errors: group_errors(state.errors),
conflicting_tx_digest_to_retry: Some(most_staked_conflicting_tx),
conflicting_tx_digests: state.conflicting_tx_digests,
};
}

warn!(
?state.conflicting_tx_digests,
?most_staked_conflicting_tx,
?original_tx_digest,
original_tx_stake = good_stake,
most_staked_conflicting_tx_stake = most_staked_conflicting_tx_stake,
"Client double spend attempt detected: {:?}",
validators
);
self.metrics
.total_client_double_spend_attempts_detected
.inc();

return AggregatorProcessTransactionError::FatalConflictingTransaction {
errors: group_errors(state.errors),
conflicting_tx_digests: state.conflicting_tx_digests,
};
}

if !state.retryable {
if state.tx_finalized_with_different_user_sig
|| state.check_if_error_indicates_tx_finalized_with_different_user_sig(
Expand All @@ -1258,6 +1172,25 @@ where
{
return AggregatorProcessTransactionError::TxAlreadyFinalizedWithDifferentUserSignatures;
}

// Handle conflicts first as `FatalConflictingTransaction` which is
// more meaningful than `FatalTransaction`
if !state.conflicting_tx_digests.is_empty() {
let good_stake = state.tx_signatures.total_votes();
warn!(
?state.conflicting_tx_digests,
original_tx_stake = good_stake,
"Client double spend attempt detected!",
);
self.metrics
.total_client_double_spend_attempts_detected
.inc();
return AggregatorProcessTransactionError::FatalConflictingTransaction {
errors: group_errors(state.errors),
conflicting_tx_digests: state.conflicting_tx_digests,
};
}

return AggregatorProcessTransactionError::FatalTransaction {
errors: group_errors(state.errors),
};
Expand All @@ -1281,7 +1214,7 @@ where
};
}

// No conflicting transaction, the system is not overloaded and transaction state is still retryable.
// The system is not overloaded and transaction state is still retryable.
AggregatorProcessTransactionError::RetryableTransaction {
errors: group_errors(state.errors),
}
Expand Down Expand Up @@ -1483,7 +1416,6 @@ where

fn get_retryable_stake(&self, state: &ProcessTransactionState) -> StakeUnit {
self.committee.total_votes()
- state.conflicting_tx_total_stake
- state.non_retryable_stake
- state.effects_map.total_votes()
- state.tx_signatures.total_votes()
Expand Down
34 changes: 4 additions & 30 deletions crates/sui-core/src/quorum_driver/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ pub struct QuorumDriverMetrics {
// TODO: add histogram of attempt that tx succeeds
pub(crate) current_requests_in_flight: IntGauge,

pub(crate) total_err_process_tx_responses_with_nonzero_conflicting_transactions: IntCounter,
pub(crate) total_attempts_retrying_conflicting_transaction: IntCounter,
pub(crate) total_successful_attempts_retrying_conflicting_transaction: IntCounter,
pub(crate) total_times_conflicting_transaction_already_finalized_when_retrying: IntCounter,
pub(crate) total_retryable_overload_errors: IntCounter,
pub(crate) transaction_retry_count: Histogram,
pub(crate) current_transactions_in_retry: IntGauge,
Expand Down Expand Up @@ -72,37 +68,14 @@ impl QuorumDriverMetrics {
"Total attempt times of ok response",
mysten_metrics::COUNT_BUCKETS.to_vec(),
registry,
).unwrap(),
)
.unwrap(),
current_requests_in_flight: register_int_gauge_with_registry!(
"current_requests_in_flight",
"Current number of requests being processed in QuorumDriver",
registry,
)
.unwrap(),
total_err_process_tx_responses_with_nonzero_conflicting_transactions: register_int_counter_with_registry!(
"quorum_driver_total_err_process_tx_responses_with_nonzero_conflicting_transactions",
"Total number of err process_tx responses with non empty conflicting transactions",
registry,
)
.unwrap(),
total_attempts_retrying_conflicting_transaction: register_int_counter_with_registry!(
"quorum_driver_total_attempts_trying_conflicting_transaction",
"Total number of attempts to retry a conflicting transaction",
registry,
)
.unwrap(),
total_successful_attempts_retrying_conflicting_transaction: register_int_counter_with_registry!(
"quorum_driver_total_successful_attempts_trying_conflicting_transaction",
"Total number of successful attempts to retry a conflicting transaction",
registry,
)
.unwrap(),
total_times_conflicting_transaction_already_finalized_when_retrying: register_int_counter_with_registry!(
"quorum_driver_total_times_conflicting_transaction_already_finalized_when_retrying",
"Total number of times the conflicting transaction is already finalized when retrying",
registry,
)
.unwrap(),
total_retryable_overload_errors: register_int_counter_with_registry!(
"quorum_driver_total_retryable_overload_errors",
"Total number of transactions experiencing retryable overload error",
Expand All @@ -114,7 +87,8 @@ impl QuorumDriverMetrics {
"Histogram of transaction retry count",
mysten_metrics::COUNT_BUCKETS.to_vec(),
registry,
).unwrap(),
)
.unwrap(),
current_transactions_in_retry: register_int_gauge_with_registry!(
"current_transactions_in_retry",
"Current number of transactions in retry loop in QuorumDriver",
Expand Down
Loading

0 comments on commit 5891ae8

Please sign in to comment.