Skip to content

Commit

Permalink
[Benchmark] Allow parallel txn generation when transaction generator …
Browse files Browse the repository at this point in the history
…is provided. (aptos-labs#10463)
  • Loading branch information
grao1991 authored Oct 13, 2023
1 parent 5b08089 commit 106a644
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 31 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ test-case = "3.1.0"
textwrap = "0.15.0"
thiserror = "1.0.37"
threadpool = "1.8.1"
thread_local = "1.1.7"
time = { version = "0.3.24", features = ["serde"] }
tiny-bip39 = "0.8.2"
tiny-keccak = { version = "2.0.2", features = ["keccak", "sha3"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ impl TxnEmitter {
let stats = Arc::new(DynamicStatsTracking::new(stats_tracking_phases));
let tokio_handle = Handle::current();

let (mut txn_generator_creator, _, _) = create_txn_generator_creator(
let (txn_generator_creator, _, _) = create_txn_generator_creator(
&req.transaction_mix_per_phase,
&mut all_accounts,
vec![],
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-generator-lib/src/account_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl AccountGeneratorCreator {
}

impl TransactionGeneratorCreator for AccountGeneratorCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
Box::new(AccountGenerator::new(
StdRng::from_entropy(),
self.txn_factory.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl AccountsPoolWrapperCreator {
}

impl TransactionGeneratorCreator for AccountsPoolWrapperCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
Box::new(AccountsPoolWrapperGenerator::new(
self.creator.create_transaction_generator(),
self.accounts_pool.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-generator-lib/src/batch_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl BatchTransferTransactionGeneratorCreator {
}

impl TransactionGeneratorCreator for BatchTransferTransactionGeneratorCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
Box::new(BatchTransferTransactionGenerator::new(
StdRng::from_entropy(),
self.batch_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl CustomModulesDelegationGeneratorCreator {
}

impl TransactionGeneratorCreator for CustomModulesDelegationGeneratorCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
Box::new(CustomModulesDelegationGenerator::new(
StdRng::from_entropy(),
self.txn_factory.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-generator-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub trait TransactionGenerator: Sync + Send {

#[async_trait]
pub trait TransactionGeneratorCreator: Sync + Send {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator>;
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator>;
}

pub struct CounterState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl P2PTransactionGeneratorCreator {
}

impl TransactionGeneratorCreator for P2PTransactionGeneratorCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
let rng = StdRng::from_entropy();
let sampler: Box<dyn Sampler<AccountAddress>> = match self.sampling_mode {
SamplingMode::Basic => Box::new(BasicSampler::new()),
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-generator-lib/src/publish_modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl PublishPackageCreator {
}

impl TransactionGeneratorCreator for PublishPackageCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
Box::new(PublishPackageGenerator::new(
StdRng::from_entropy(),
self.package_handler.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ impl PhasedTxnMixGeneratorCreator {
}

impl TransactionGeneratorCreator for PhasedTxnMixGeneratorCreator {
fn create_transaction_generator(&mut self) -> Box<dyn TransactionGenerator> {
fn create_transaction_generator(&self) -> Box<dyn TransactionGenerator> {
let mut txn_mix_per_phase = Vec::<Vec<(Box<dyn TransactionGenerator>, usize)>>::new();
for txn_mix_creators in self.txn_mix_per_phase_creators.iter_mut() {
for txn_mix_creators in self.txn_mix_per_phase_creators.iter() {
let mut txn_mix = Vec::<(Box<dyn TransactionGenerator>, usize)>::new();
for (generator_creator, weight) in txn_mix_creators.iter_mut() {
for (generator_creator, weight) in txn_mix_creators.iter() {
txn_mix.push((generator_creator.create_transaction_generator(), *weight));
}
txn_mix_per_phase.push(txn_mix);
Expand Down
1 change: 1 addition & 0 deletions execution/executor-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ once_cell = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
thread_local = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }

Expand Down
14 changes: 13 additions & 1 deletion execution/executor-benchmark/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

#![forbid(unsafe_code)]

use aptos_metrics_core::{exponential_buckets, register_histogram_vec, HistogramVec};
use aptos_metrics_core::{
exponential_buckets, register_histogram_vec, register_int_counter_vec, HistogramVec,
IntCounterVec,
};
use once_cell::sync::Lazy;

pub static TIMER: Lazy<HistogramVec> = Lazy::new(|| {
Expand All @@ -15,3 +18,12 @@ pub static TIMER: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static NUM_TXNS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_executor_benchmark_num_txns",
"# of transactions received by each stage.",
&["stage"]
)
.unwrap()
});
15 changes: 14 additions & 1 deletion execution/executor-benchmark/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
block_preparation::BlockPreparationStage, ledger_update_stage::LedgerUpdateStage,
GasMesurement, TransactionCommitter, TransactionExecutor,
metrics::NUM_TXNS, GasMesurement, TransactionCommitter, TransactionExecutor,
};
use aptos_block_partitioner::v2::config::PartitionerV2Config;
use aptos_crypto::HashValue;
Expand Down Expand Up @@ -127,6 +127,9 @@ where
.name("block_partitioning".to_string())
.spawn(move || {
while let Ok(txns) = raw_block_receiver.recv() {
NUM_TXNS
.with_label_values(&["partition"])
.inc_by(txns.len() as u64);
let exe_block_msg = partitioning_stage.process(txns);
executable_block_sender.send(exe_block_msg).unwrap();
}
Expand All @@ -148,6 +151,9 @@ where
block,
} = msg;
let block_size = block.transactions.num_transactions();
NUM_TXNS
.with_label_values(&["execution"])
.inc_by(block_size as u64);
info!("Received block of size {:?} to execute", block_size);
executed += block_size;
exe.execute_block(current_block_start_time, partition_time, block);
Expand Down Expand Up @@ -182,6 +188,13 @@ where
.name("ledger_update".to_string())
.spawn(move || {
while let Ok(ledger_update_msg) = ledger_update_receiver.recv() {
let block_size = ledger_update_msg
.state_checkpoint_output
.txn_statuses()
.len();
NUM_TXNS
.with_label_values(&["ledger_update"])
.inc_by(block_size as u64);
ledger_update_stage.ledger_update(ledger_update_msg);
}
})
Expand Down
5 changes: 4 additions & 1 deletion execution/executor-benchmark/src/transaction_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::pipeline::CommitBlockMessage;
use crate::{metrics::NUM_TXNS, pipeline::CommitBlockMessage};
use aptos_crypto::hash::HashValue;
use aptos_db::metrics::API_LATENCY_SECONDS;
use aptos_executor::{
Expand Down Expand Up @@ -83,6 +83,9 @@ where
execution_time,
num_txns,
} = msg;
NUM_TXNS
.with_label_values(&["commit"])
.inc_by(num_txns as u64);
self.version += num_txns as u64;
let commit_start = std::time::Instant::now();
let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, self.version);
Expand Down
51 changes: 36 additions & 15 deletions execution/executor-benchmark/src/transaction_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::account_generator::{AccountCache, AccountGenerator};
use crate::{
account_generator::{AccountCache, AccountGenerator},
metrics::{NUM_TXNS, TIMER},
};
use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue};
use aptos_logger::info;
use aptos_sdk::{transaction_builder::TransactionFactory, types::LocalAccount};
Expand All @@ -27,13 +30,15 @@ use serde::{Deserialize, Serialize};
#[cfg(test)]
use std::collections::HashSet;
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
fs::File,
io::{Read, Write},
iter::once,
path::Path,
sync::{mpsc, Arc},
};
use thread_local::ThreadLocal;

const META_FILENAME: &str = "metadata.toml";
pub const MAX_ACCOUNTS_INVOLVED_IN_P2P: usize = 1_000_000;
Expand Down Expand Up @@ -290,33 +295,44 @@ impl TransactionGenerator {
&mut self,
block_size: usize,
num_blocks: usize,
mut transaction_generator_creator: Box<dyn TransactionGeneratorCreator>,
transaction_generator_creator: Box<dyn TransactionGeneratorCreator>,
transactions_per_sender: usize,
) {
assert!(self.block_sender.is_some());
let num_senders_per_block =
(block_size + transactions_per_sender - 1) / transactions_per_sender;
let account_pool_size = self.main_signer_accounts.as_ref().unwrap().accounts.len();
let mut transaction_generator =
transaction_generator_creator.create_transaction_generator();
let transaction_generator = ThreadLocal::with_capacity(self.num_workers);
for _ in 0..num_blocks {
let transactions: Vec<_> = rand::seq::index::sample(
let sender_indices = rand::seq::index::sample(
&mut thread_rng(),
account_pool_size,
num_senders_per_block,
)
.into_iter()
.flat_map(|idx| {
let sender = &self.main_signer_accounts.as_mut().unwrap().accounts[idx];
transaction_generator.generate_transactions(sender, transactions_per_sender)
})
.map(Transaction::UserTransaction)
.chain(once(Transaction::StateCheckpoint(HashValue::random())))
.flat_map(|sender_idx| vec![sender_idx; transactions_per_sender])
.collect();

if let Some(sender) = &self.block_sender {
sender.send(transactions).unwrap();
}
self.generate_and_send_block(
self.main_signer_accounts.as_ref().unwrap(),
sender_indices,
|sender_idx, _| {
let sender = &self.main_signer_accounts.as_ref().unwrap().accounts[sender_idx];
let mut transaction_generator = transaction_generator
.get_or(|| {
RefCell::new(
transaction_generator_creator.create_transaction_generator(),
)
})
.borrow_mut();
Transaction::UserTransaction(
transaction_generator
.generate_transactions(sender, 1)
.pop()
.unwrap(),
)
},
|sender_idx| *sender_idx,
);
}
}

Expand Down Expand Up @@ -624,6 +640,7 @@ impl TransactionGenerator {
F: Fn(T, &AccountCache) -> Transaction + Send + Sync,
S: Fn(&T) -> usize,
{
let _timer = TIMER.with_label_values(&["generate_block"]).start_timer();
let block_size = inputs.len();
let mut jobs = Vec::new();
jobs.resize_with(self.num_workers, BTreeMap::new);
Expand Down Expand Up @@ -655,6 +672,10 @@ impl TransactionGenerator {

transactions.push(Transaction::StateCheckpoint(HashValue::random()));

NUM_TXNS
.with_label_values(&["generation_done"])
.inc_by(transactions.len() as u64);

if let Some(sender) = &self.block_sender {
sender.send(transactions).unwrap();
}
Expand Down

0 comments on commit 106a644

Please sign in to comment.