Skip to content

Commit

Permalink
fix: rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 18, 2024
1 parent baa8f19 commit 9a816ba
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 35 deletions.
2 changes: 2 additions & 0 deletions tests-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(assert_matches)]

pub mod cluster;
mod grpc;
mod influxdb;
Expand Down
67 changes: 33 additions & 34 deletions tests-integration/src/tests/instance_kafka_wal_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use common_query::Output;
use common_recordbatch::util;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::error::Result;
use frontend::instance::Instance;
use rand::rngs::ThreadRng;
use rand::Rng;
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use session::context::QueryContext;
use tokio::sync::Mutex;

use crate::tests::test_util::*;

#[apply(both_instances_cases_with_kafka_wal)]
async fn test_create_database_and_insert_query(instance: Option<Box<dyn RebuildableMockInstance>>) {
let Some(instance) = instance else { return };

async fn test_create_database_and_insert_query(
rebuildable_instance: Option<Box<dyn RebuildableMockInstance>>,
) {
let Some(instance) = rebuildable_instance else {
return;
};
let instance = instance.frontend();

let output = execute_sql(&instance, "create database test").await;
assert!(matches!(output, Output::AffectedRows(1)));
assert_matches!(output, Output::AffectedRows(1));

let output = execute_sql(
&instance,
Expand All @@ -43,25 +49,25 @@ async fn test_create_database_and_insert_query(instance: Option<Box<dyn Rebuilda
memory DOUBLE,
ts timestamp,
TIME INDEX(ts)
)"#,
)"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
assert_matches!(output, Output::AffectedRows(0));

let output = execute_sql(
&instance,
r#"insert into test.demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
assert_matches!(output, Output::AffectedRows(2));

let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
let batches = common_recordbatch::util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(TimestampMillisecondVector::from_vec(vec![
Expand Down Expand Up @@ -94,7 +100,7 @@ async fn test_replay(rebuildable_instance: Option<Box<dyn RebuildableMockInstanc
insert_data(&tables, &instance, 5).await;
ensure_data_exists(&tables, &instance).await;

// Rebuilds to emulate restart to trigger a replay.
// Rebuilds to emulate restart which then triggers a replay.
let instance = rebuildable_instance.rebuild().await;
ensure_data_exists(&tables, &instance).await;
}
Expand All @@ -121,7 +127,7 @@ async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn Rebuildable
let new_table_name = format!("{}{}", table.name, table.name.chars().last().unwrap());
assert_matches!(
do_alter(&instance, &table.name, &new_table_name).await,
Output::AffectedRows(1)
Output::AffectedRows(0)
);
Arc::new(Table {
name: new_table_name,
Expand All @@ -133,7 +139,7 @@ async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn Rebuildable
.await;
ensure_data_exists(&tables, &instance).await;

// Rebuilds to emulate restart to trigger a replay.
// Rebuilds to emulate restart which then triggers a replay.
let instance = rebuildable_instance.rebuild().await;
ensure_data_exists(&tables, &instance).await;
}
Expand All @@ -149,7 +155,7 @@ async fn create_tables(
let table_name = format!("{}_{}", test_name, i);
assert_matches!(
do_create(&instance, &table_name).await,
Output::AffectedRows(1)
Output::AffectedRows(0)
);
Arc::new(Table {
name: table_name,
Expand Down Expand Up @@ -248,23 +254,16 @@ async fn do_query(instance: &Arc<Instance>, table_name: &str) -> Output {
}

async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_with(instance, sql, QueryContext::arc()).await
}

async fn try_execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Result<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}

async fn execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Output {
try_execute_sql_with(instance, sql, query_ctx)
instance
.do_query(sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}

fn make_row(ts: u64, rng: &mut ThreadRng) -> String {
let host = format!("host{}", rng.gen_range(0..5));
let cpu: f64 = rng.gen_range(0.0..99.9);
let memory: f64 = rng.gen_range(0.0..999.9);
format!("('{host}', {cpu}, {memory}, {ts})")
}
2 changes: 1 addition & 1 deletion tests-integration/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
pub(crate) fn both_instances_cases_with_kafka_wal(
#[future]
#[case]
instance: Option<Box<dyn RebuildableMockInstance>>,
rebuildable_instance: Option<Box<dyn RebuildableMockInstance>>,
) {
}

Expand Down

0 comments on commit 9a816ba

Please sign in to comment.