Skip to content

Commit

Permalink
chore: make standalone test work
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 21, 2024
1 parent a633ad9 commit 7c74b88
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions tests-integration/src/tests/instance_kafka_wal_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use std::assert_matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use client::DEFAULT_CATALOG_NAME;
use common_query::Output;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::instance::Instance;
use itertools::Itertools;
use rand::rngs::ThreadRng;
use rand::Rng;
use rstest::rstest;
Expand Down Expand Up @@ -172,14 +174,16 @@ async fn insert_data(tables: &[Arc<Table>], instance: &Arc<Instance>, num_writer
futures::future::join_all((0..num_writers).map(|_| async {
let mut rng = rand::thread_rng();
let table = &tables[rng.gen_range(0..tables.len())];
let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed);
for _ in 0..100 {
for _ in 0..10 {
let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed);
let row = make_row(ts, &mut rng);
assert_matches!(
do_insert(instance, &table.name, row).await,
Output::AffectedRows(1)
);
{
// Inserting into the `inserted` vector and inserting into the database are not atomic
// which requires us to do a sorting upon checking data integrity.
let mut inserted = table.inserted.lock().await;
inserted.push(ts);
}
Expand All @@ -203,8 +207,15 @@ async fn ensure_data_exists(tables: &[Arc<Table>], instance: &Arc<Instance>) {
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let inserted = table.inserted.lock().await;
assert_eq!(queried, *inserted);
let inserted = table
.inserted
.lock()
.await
.iter()
.sorted()
.cloned()
.collect::<Vec<_>>();
assert_eq!(queried, inserted);
}))
.await;
}
Expand All @@ -229,10 +240,7 @@ async fn do_create(instance: &Arc<Instance>, table_name: &str) -> Output {
async fn do_alter(instance: &Arc<Instance>, table_name: &str, new_table_name: &str) -> Output {
execute_sql(
instance,
&format!(
"alter table test.{} rename test.{}",
table_name, new_table_name
),
&format!("alter table {} rename {}", table_name, new_table_name),
)
.await
}
Expand All @@ -255,7 +263,7 @@ async fn do_query(instance: &Arc<Instance>, table_name: &str) -> Output {

async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
instance
.do_query(sql, QueryContext::arc())
.do_query(sql, QueryContext::with(DEFAULT_CATALOG_NAME, "test"))
.await
.remove(0)
.unwrap()
Expand Down

0 comments on commit 7c74b88

Please sign in to comment.