diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 42496b8458ce..30f447df7ce3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -457,6 +457,7 @@ async fn open_all_regions( table_values: Vec, open_with_writable: bool, ) -> Result<()> { + info!("Opening all regions"); let mut regions = vec![]; for table_value in table_values { for region_number in table_value.regions { diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index e4d52244efb8..35d8f117043b 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -392,6 +392,7 @@ pub(crate) async fn replay_memtable( version_control: &VersionControlRef, allow_stale_entries: bool, ) -> Result { + info!("Replaying for region id {}", region_id); let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no // data in the WAL. diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 4b49d0c6a586..d214f8d2eee6 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -71,6 +71,7 @@ tower = "0.4" uuid.workspace = true [dev-dependencies] +common-wal = { workspace = true, features = ["testing"] } datafusion-expr.workspace = true datafusion.workspace = true itertools.workspace = true diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index d3e700151345..8c0719c32c07 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(assert_matches)] +#![feature(assert_matches, let_chains)] pub mod cluster; mod grpc; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2d6d00ef2e87..79ce240b6854 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -33,9 +33,12 @@ use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; +use once_cell::sync::OnceCell; use servers::Mode; -use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; +use crate::test_util::{ + self, create_tmp_dir_and_datanode_opts, StaticContext, StorageType, TestGuard, +}; pub struct GreptimeDbStandalone { pub instance: Arc, @@ -185,7 +188,125 @@ impl GreptimeDbStandaloneBuilder { instance: Arc::new(instance), datanode_opts: opts.clone(), mix_options: MixOptions { - data_home: opts.storage.data_home.to_string(), + data_home: opts.storage.data_home.clone(), + procedure: procedure_config, + metadata_store: kv_backend_config, + frontend: FrontendOptions::default(), + datanode: opts, + logging: LoggingOptions::default(), + wal_meta, + }, + guard, + } + } + + // TODO(niebayes): reuse codes. + pub async fn build_with_static_context( + self, + static_context: &OnceCell, + ) -> GreptimeDbStandalone { + let default_store_type = self.default_store.unwrap_or(StorageType::File); + let store_types = self.store_providers.unwrap_or_default(); + + let (mut opts, mut guard) = create_tmp_dir_and_datanode_opts( + Mode::Standalone, + default_store_type, + store_types, + &self.instance_name, + self.wal_config.clone(), + ); + + match static_context.get() { + Some(context) => { + let data_home = context + .data_home + .temp_dir + .path() + .to_str() + .unwrap() + .to_string(); + opts.storage.data_home = data_home; + guard.home_guard = context.data_home.clone(); + } + None => { + static_context + .set(StaticContext { + data_home: guard.home_guard.clone(), + }) + .unwrap(); + } + } + + let procedure_config = ProcedureConfig::default(); + let kv_backend_config = KvBackendConfig::default(); + let (kv_backend, procedure_manager) = Instance::try_build_standalone_components( + format!("{}/kv", &opts.storage.data_home), + kv_backend_config.clone(), + procedure_config.clone(), + ) + .await + .unwrap(); + + let plugins = self.plugin.unwrap_or_default(); + + let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone()) + .with_kv_backend(kv_backend.clone()) + .build() + .await + .unwrap(); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + table_metadata_manager.init().await.unwrap(); + + let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + + let table_id_sequence = Arc::new( + SequenceBuilder::new("table_id", kv_backend.clone()) + .initial(MIN_USER_TABLE_ID as u64) + .step(10) + .build(), + ); + let wal_meta = self.meta_wal_config.clone(); + let wal_options_allocator = Arc::new(WalOptionsAllocator::new( + wal_meta.clone(), + kv_backend.clone(), + )); + let table_meta_allocator = TableMetadataAllocator::new( + table_id_sequence, + wal_options_allocator.clone(), + table_metadata_manager.clone(), + ); + + let ddl_task_executor = Arc::new( + DdlManager::try_new( + procedure_manager.clone(), + datanode_manager.clone(), + Arc::new(DummyCacheInvalidator), + table_metadata_manager, + table_meta_allocator, + Arc::new(MemoryRegionKeeper::default()), + ) + .unwrap(), + ); + + let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) + .with_plugin(plugins) + .try_build() + .await + .unwrap(); + + procedure_manager.start().await.unwrap(); + wal_options_allocator.start().await.unwrap(); + + test_util::prepare_another_catalog_and_schema(&instance).await; + + instance.start().await.unwrap(); + + GreptimeDbStandalone { + instance: Arc::new(instance), + datanode_opts: opts.clone(), + mix_options: MixOptions { + data_home: opts.storage.data_home.clone(), procedure: procedure_config, metadata_store: kv_backend_config, frontend: FrontendOptions::default(), diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 623ffc27cac6..c527c7a5e005 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -24,7 +24,6 @@ use catalog::kvbackend::KvBackendCatalogManager; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_runtime::Builder as RuntimeBuilder; -use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use common_wal::config::DatanodeWalConfig; @@ -35,7 +34,6 @@ use datanode::config::{ use frontend::frontend::TomlSerializable; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; -use futures::future::BoxFuture; use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; @@ -271,13 +269,16 @@ pub struct TestGuard { pub storage_guards: Vec, } +#[derive(Clone, Debug)] pub struct FileDirGuard { - pub temp_dir: TempDir, + pub temp_dir: Arc, } impl FileDirGuard { pub fn new(temp_dir: TempDir) -> Self { - Self { temp_dir } + Self { + temp_dir: Arc::new(temp_dir), + } } } @@ -657,21 +658,8 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { .unwrap(); } -pub async fn run_test_with_kafka_wal(test: F) -where - F: FnOnce(Vec) -> BoxFuture<'static, ()>, -{ - let _ = dotenv::dotenv(); - let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); - if endpoints.is_empty() { - warn!("The endpoints is empty, skipping the test"); - return; - } - - let endpoints = endpoints - .split(',') - .map(|s| s.trim().to_string()) - .collect::>(); - - test(endpoints).await +/// Static Context for RebuildableInstance. These context must be preserved across rebuilds. +#[derive(Debug)] +pub struct StaticContext { + pub data_home: FileDirGuard, } diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs index dc4089f54a33..b21299140a9b 100644 --- a/tests-integration/src/tests/instance_kafka_wal_test.rs +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use common_query::Output; +use common_telemetry::info; use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; use frontend::instance::Instance; use rand::rngs::ThreadRng; @@ -96,11 +97,12 @@ async fn test_replay(rebuildable_instance: Option, num_tables: usize, ) -> Vec> { + info!("Creating table for {test_name}"); futures::future::join_all((0..num_tables).map(|i| { let instance = instance.clone(); async move { @@ -168,12 +171,13 @@ async fn create_tables( } async fn insert_data(tables: &[Arc], instance: &Arc, num_writers: usize) { + info!("Inserting data"); // Each writer randomly chooses a table and inserts a sequence of rows into the table. futures::future::join_all((0..num_writers).map(|_| async { let mut rng = rand::thread_rng(); let table = &tables[rng.gen_range(0..tables.len())]; let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed); - for _ in 0..100 { + for _ in 0..1 { let row = make_row(ts, &mut rng); assert_matches!( do_insert(instance, &table.name, row).await, @@ -189,6 +193,7 @@ async fn insert_data(tables: &[Arc
], instance: &Arc, num_writer } async fn ensure_data_exists(tables: &[Arc
], instance: &Arc) { + info!("Scanning data"); futures::future::join_all(tables.iter().map(|table| async { let output = do_query(instance, &table.name).await; let Output::Stream(stream) = output else { diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 43cc4c37acb3..b3c02bb22b8b 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -22,11 +22,12 @@ use common_test_util::find_workspace_path; use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig}; use frontend::instance::Instance; +use once_cell::sync::OnceCell; use rstest_reuse::{self, template}; use crate::cluster::GreptimeDbClusterBuilder; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; -use crate::test_util::StorageType; +use crate::test_util::{StaticContext, StorageType}; use crate::tests::{create_distributed_instance, MockDistributedInstance}; #[async_trait::async_trait] @@ -77,25 +78,61 @@ impl MockInstanceBuilder { } } } + + // TODO(niebayes): implement build_with_static_context for cluster builder. + async fn build_with_static_context( + &self, + static_context: &OnceCell, + ) -> Arc { + match self { + MockInstanceBuilder::Standalone(builder) => Arc::new( + builder + .clone() + .build_with_static_context(static_context) + .await, + ), + MockInstanceBuilder::Distributed(builder) => { + Arc::new(MockDistributedInstance(builder.clone().build().await)) + } + } + } } pub(crate) struct TestContext { instance: Arc, builder: MockInstanceBuilder, + static_context: OnceCell, } impl TestContext { - async fn new(builder: MockInstanceBuilder) -> Self { - let instance = builder.build().await; - - Self { instance, builder } + async fn new(builder: MockInstanceBuilder, has_static_context: bool) -> Self { + // Initializes the static context on the first building. + let static_context = OnceCell::new(); + let instance = if has_static_context { + builder.build_with_static_context(&static_context).await + } else { + builder.build().await + }; + + Self { + instance, + builder, + static_context, + } } } #[async_trait::async_trait] impl RebuildableMockInstance for TestContext { async fn rebuild(&mut self) -> Arc { - let instance = self.builder.build().await; + let instance = match self.static_context.get() { + Some(_) => { + self.builder + .build_with_static_context(&self.static_context) + .await + } + None => self.builder.build().await, + }; self.instance = instance; self.instance.frontend() } @@ -171,7 +208,7 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option