Skip to content

Commit

Permalink
feature: add static context for TestContext
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Jan 18, 2024
1 parent a633ad9 commit 2280be3
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ async fn open_all_regions(
table_values: Vec<DatanodeTableValue>,
open_with_writable: bool,
) -> Result<()> {
info!("Opening all regions");
let mut regions = vec![];
for table_value in table_values {
for region_number in table_value.regions {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ pub(crate) async fn replay_memtable<S: LogStore>(
version_control: &VersionControlRef,
allow_stale_entries: bool,
) -> Result<EntryId> {
info!("Replaying for region id {}", region_id);
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
Expand Down
1 change: 1 addition & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ tower = "0.4"
uuid.workspace = true

[dev-dependencies]
common-wal = { workspace = true, features = ["testing"] }
datafusion-expr.workspace = true
datafusion.workspace = true
itertools.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(assert_matches)]
#![feature(assert_matches, let_chains)]

pub mod cluster;
mod grpc;
Expand Down
125 changes: 123 additions & 2 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use once_cell::sync::OnceCell;
use servers::Mode;

use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
use crate::test_util::{
self, create_tmp_dir_and_datanode_opts, StaticContext, StorageType, TestGuard,
};

pub struct GreptimeDbStandalone {
pub instance: Arc<Instance>,
Expand Down Expand Up @@ -185,7 +188,125 @@ impl GreptimeDbStandaloneBuilder {
instance: Arc::new(instance),
datanode_opts: opts.clone(),
mix_options: MixOptions {
data_home: opts.storage.data_home.to_string(),
data_home: opts.storage.data_home.clone(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
},
guard,
}
}

// TODO(niebayes): reuse codes.
pub async fn build_with_static_context(
self,
static_context: &OnceCell<StaticContext>,
) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();

let (mut opts, mut guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);

match static_context.get() {
Some(context) => {
let data_home = context
.data_home
.temp_dir
.path()
.to_str()
.unwrap()
.to_string();
opts.storage.data_home = data_home;
guard.home_guard = context.data_home.clone();
}
None => {
static_context
.set(StaticContext {
data_home: guard.home_guard.clone(),
})
.unwrap();
}
}

let procedure_config = ProcedureConfig::default();
let kv_backend_config = KvBackendConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();

let plugins = self.plugin.unwrap_or_default();

let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.unwrap();

let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_metadata_manager.init().await.unwrap();

let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));

let table_id_sequence = Arc::new(
SequenceBuilder::new("table_id", kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
);
let wal_meta = self.meta_wal_config.clone();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_metadata_manager.clone(),
);

let ddl_task_executor = Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_manager.clone(),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
)
.unwrap(),
);

let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();

procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();

test_util::prepare_another_catalog_and_schema(&instance).await;

instance.start().await.unwrap();

GreptimeDbStandalone {
instance: Arc::new(instance),
datanode_opts: opts.clone(),
mix_options: MixOptions {
data_home: opts.storage.data_home.clone(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
Expand Down
30 changes: 9 additions & 21 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use catalog::kvbackend::KvBackendCatalogManager;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::config::DatanodeWalConfig;
Expand All @@ -35,7 +34,6 @@ use datanode::config::{
use frontend::frontend::TomlSerializable;
use frontend::instance::Instance;
use frontend::service_config::{MysqlOptions, PostgresOptions};
use futures::future::BoxFuture;
use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
Expand Down Expand Up @@ -271,13 +269,16 @@ pub struct TestGuard {
pub storage_guards: Vec<StorageGuard>,
}

#[derive(Clone, Debug)]
pub struct FileDirGuard {
pub temp_dir: TempDir,
pub temp_dir: Arc<TempDir>,
}

impl FileDirGuard {
pub fn new(temp_dir: TempDir) -> Self {
Self { temp_dir }
Self {
temp_dir: Arc::new(temp_dir),
}
}
}

Expand Down Expand Up @@ -657,21 +658,8 @@ pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
.unwrap();
}

pub async fn run_test_with_kafka_wal<F>(test: F)
where
F: FnOnce(Vec<String>) -> BoxFuture<'static, ()>,
{
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return;
}

let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();

test(endpoints).await
/// Static Context for RebuildableInstance. These context must be preserved across rebuilds.
#[derive(Debug)]
pub struct StaticContext {
pub data_home: FileDirGuard,
}
15 changes: 10 additions & 5 deletions tests-integration/src/tests/instance_kafka_wal_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use common_query::Output;
use common_telemetry::info;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::instance::Instance;
use rand::rngs::ThreadRng;
Expand Down Expand Up @@ -96,11 +97,12 @@ async fn test_replay(rebuildable_instance: Option<Box<dyn RebuildableMockInstanc
let output = execute_sql(&instance, "create database test").await;
assert_matches!(output, Output::AffectedRows(1));

let tables = create_tables("test_replay", &instance, 3).await;
insert_data(&tables, &instance, 5).await;
let tables = create_tables("test_replay", &instance, 1).await;
insert_data(&tables, &instance, 1).await;
ensure_data_exists(&tables, &instance).await;

// Rebuilds to emulate restart which then triggers a replay.
info!("Rebuilding");
let instance = rebuildable_instance.rebuild().await;
ensure_data_exists(&tables, &instance).await;
}
Expand All @@ -115,8 +117,8 @@ async fn test_flush_then_replay(rebuildable_instance: Option<Box<dyn Rebuildable
let output = execute_sql(&instance, "create database test").await;
assert_matches!(output, Output::AffectedRows(1));

let tables = create_tables("test_flush_then_replay", &instance, 3).await;
insert_data(&tables, &instance, 5).await;
let tables = create_tables("test_flush_then_replay", &instance, 1).await;
insert_data(&tables, &instance, 1).await;
ensure_data_exists(&tables, &instance).await;

// Renames tables to force flushing each table.
Expand Down Expand Up @@ -149,6 +151,7 @@ async fn create_tables(
instance: &Arc<Instance>,
num_tables: usize,
) -> Vec<Arc<Table>> {
info!("Creating table for {test_name}");
futures::future::join_all((0..num_tables).map(|i| {
let instance = instance.clone();
async move {
Expand All @@ -168,12 +171,13 @@ async fn create_tables(
}

async fn insert_data(tables: &[Arc<Table>], instance: &Arc<Instance>, num_writers: usize) {
info!("Inserting data");
// Each writer randomly chooses a table and inserts a sequence of rows into the table.
futures::future::join_all((0..num_writers).map(|_| async {
let mut rng = rand::thread_rng();
let table = &tables[rng.gen_range(0..tables.len())];
let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed);
for _ in 0..100 {
for _ in 0..1 {
let row = make_row(ts, &mut rng);
assert_matches!(
do_insert(instance, &table.name, row).await,
Expand All @@ -189,6 +193,7 @@ async fn insert_data(tables: &[Arc<Table>], instance: &Arc<Instance>, num_writer
}

async fn ensure_data_exists(tables: &[Arc<Table>], instance: &Arc<Instance>) {
info!("Scanning data");
futures::future::join_all(tables.iter().map(|table| async {
let output = do_query(instance, &table.name).await;
let Output::Stream(stream) = output else {
Expand Down
Loading

0 comments on commit 2280be3

Please sign in to comment.