Skip to content

Commit

Permalink
fix: apply suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 23, 2024
1 parent 3e73e53 commit 5f9af55
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions tests-integration/src/tests/instance_kafka_wal_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn Rebuildable
do_alter(&instance, &table.name, &new_table_name).await,
Output::AffectedRows(0)
);
Arc::new(Table {
Table {
name: new_table_name,
logical_timer: AtomicU64::new(table.logical_timer.load(Ordering::Relaxed)),
inserted: Mutex::new(table.inserted.lock().await.clone()),
})
}
}
}))
.await;
Expand All @@ -168,11 +168,7 @@ async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn Rebuildable
ensure_data_exists(&tables, &instance).await;
}

async fn create_tables(
test_name: &str,
instance: &Arc<Instance>,
num_tables: usize,
) -> Vec<Arc<Table>> {
async fn create_tables(test_name: &str, instance: &Arc<Instance>, num_tables: usize) -> Vec<Table> {
futures::future::join_all((0..num_tables).map(|i| {
let instance = instance.clone();
async move {
Expand All @@ -181,17 +177,17 @@ async fn create_tables(
do_create(&instance, &table_name).await,
Output::AffectedRows(0)
);
Arc::new(Table {
Table {
name: table_name,
logical_timer: AtomicU64::new(1685508715000),
inserted: Mutex::new(Vec::new()),
})
}
}
}))
.await
}

async fn insert_data(tables: &[Arc<Table>], instance: &Arc<Instance>, num_writers: usize) {
async fn insert_data(tables: &[Table], instance: &Arc<Instance>, num_writers: usize) {
// Each writer randomly chooses a table and inserts a sequence of rows into the table.
futures::future::join_all((0..num_writers).map(|_| async {
let mut rng = rand::thread_rng();
Expand All @@ -214,7 +210,7 @@ async fn insert_data(tables: &[Arc<Table>], instance: &Arc<Instance>, num_writer
.await;
}

async fn ensure_data_exists(tables: &[Arc<Table>], instance: &Arc<Instance>) {
async fn ensure_data_exists(tables: &[Table], instance: &Arc<Instance>) {
futures::future::join_all(tables.iter().map(|table| async {
let output = do_query(instance, &table.name).await;
let Output::Stream(stream) = output else {
Expand Down

0 comments on commit 5f9af55

Please sign in to comment.