diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs
index dc4089f54a33..80bdcc95ad82 100644
--- a/tests-integration/src/tests/instance_kafka_wal_test.rs
+++ b/tests-integration/src/tests/instance_kafka_wal_test.rs
@@ -16,9 +16,11 @@ use std::assert_matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
+use client::DEFAULT_CATALOG_NAME;
use common_query::Output;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::instance::Instance;
+use itertools::Itertools;
use rand::rngs::ThreadRng;
use rand::Rng;
use rstest::rstest;
@@ -172,14 +174,16 @@ async fn insert_data(tables: &[Arc
], instance: &Arc, num_writer
futures::future::join_all((0..num_writers).map(|_| async {
let mut rng = rand::thread_rng();
let table = &tables[rng.gen_range(0..tables.len())];
- let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed);
- for _ in 0..100 {
+ for _ in 0..10 {
+ let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed);
let row = make_row(ts, &mut rng);
assert_matches!(
do_insert(instance, &table.name, row).await,
Output::AffectedRows(1)
);
{
+ // Inserting into the `inserted` vector and inserting into the database are not atomic
+ // which requires us to do a sorting upon checking data integrity.
let mut inserted = table.inserted.lock().await;
inserted.push(ts);
}
@@ -203,8 +207,15 @@ async fn ensure_data_exists(tables: &[Arc], instance: &Arc) {
.collect::>()
})
.collect::>();
- let inserted = table.inserted.lock().await;
- assert_eq!(queried, *inserted);
+ let inserted = table
+ .inserted
+ .lock()
+ .await
+ .iter()
+ .sorted()
+ .cloned()
+ .collect::>();
+ assert_eq!(queried, inserted);
}))
.await;
}
@@ -229,10 +240,7 @@ async fn do_create(instance: &Arc, table_name: &str) -> Output {
async fn do_alter(instance: &Arc, table_name: &str, new_table_name: &str) -> Output {
execute_sql(
instance,
- &format!(
- "alter table test.{} rename test.{}",
- table_name, new_table_name
- ),
+ &format!("alter table {} rename {}", table_name, new_table_name),
)
.await
}
@@ -255,7 +263,7 @@ async fn do_query(instance: &Arc, table_name: &str) -> Output {
async fn execute_sql(instance: &Arc, sql: &str) -> Output {
instance
- .do_query(sql, QueryContext::arc())
+ .do_query(sql, QueryContext::with(DEFAULT_CATALOG_NAME, "test"))
.await
.remove(0)
.unwrap()