From abc9b418960f141955f7a3f17a8e7f2b46c77d5c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 8 Dec 2024 06:03:02 +0530 Subject: [PATCH] refactor: construct once --- src/query/stream_schema_provider.rs | 750 ++++++++++++++-------------- src/storage/azure_blob.rs | 2 +- src/storage/localfs.rs | 2 +- src/storage/object_storage.rs | 8 +- src/storage/s3.rs | 2 +- 5 files changed, 372 insertions(+), 392 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 6b3d86bfe..2ee906089 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -24,7 +24,7 @@ use crate::{ storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, }; use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef, SortOptions}; +use arrow_schema::{SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use datafusion::catalog::Session; @@ -51,7 +51,7 @@ use datafusion::{ scalar::ScalarValue, }; -use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{stream::FuturesOrdered, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; use relative_path::RelativePathBuf; @@ -114,57 +114,302 @@ struct StandardTableProvider { url: Url, } -#[allow(clippy::too_many_arguments)] -async fn create_parquet_physical_plan( - object_store_url: ObjectStoreUrl, - partitions: Vec>, - statistics: Statistics, - schema: Arc, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &dyn Session, - time_partition: Option, -) -> Result, DataFusionError> { - let filters = if let Some(expr) = conjunction(filters.to_vec()) { - let table_df_schema = schema.as_ref().clone().to_dfschema()?; - let filters = create_physical_expr(&expr, &table_df_schema, state.execution_props())?; - Some(filters) - } else { - None - }; +impl StandardTableProvider { + #[allow(clippy::too_many_arguments)] + async fn legacy_listing_table( + &self, + glob_storage: Arc, + object_store: Arc, + time_filters: &[PartialTimeFilter], + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + time_partition: Option, + ) -> Result>, DataFusionError> { + let Some(table) = ListingTableBuilder::new(self.stream.clone()) + .populate_via_listing(glob_storage.clone(), object_store, time_filters) + .await? + .build( + self.schema.clone(), + |x| glob_storage.query_prefixes(x), + time_partition, + )? + else { + return Ok(None); + }; - let sort_expr = PhysicalSortExpr { - expr: if let Some(time_partition) = time_partition { - physical_plan::expressions::col(&time_partition, &schema)? - } else { - physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)? - }, - options: SortOptions { - descending: true, - nulls_first: true, - }, - }; - let file_format = ParquetFormat::default().with_enable_pruning(true); - - // create the execution plan - let plan = file_format - .create_physical_plan( - state.as_any().downcast_ref::().unwrap(), // Remove this when ParquetFormat catches up - FileScanConfig { - object_store_url, - file_schema: schema.clone(), - file_groups: partitions, + Ok(Some(table.scan(state, projection, filters, limit).await?)) + } + + fn partitioned_files( + &self, + manifest_files: Vec, + ) -> (Vec>, datafusion::common::Statistics) { + let target_partition = num_cpus::get(); + let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); + let mut column_statistics = + HashMap::>::new(); + let mut count = 0; + for (index, file) in manifest_files + .into_iter() + .enumerate() + .map(|(x, y)| (x % target_partition, y)) + { + let catalog::manifest::File { + file_path, + num_rows, + columns, + .. + } = file; + + // object_store::path::Path doesn't automatically deal with Windows path separators + // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem + // before sending the file path to PartitionedFile + // the github issue- https://github.com/parseablehq/parseable/issues/824 + // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution + // TODO: figure out an elegant solution to this + let pf; + + #[cfg(unix)] + { + pf = PartitionedFile::new(file_path, file.file_size); + } + #[cfg(windows)] + { + pf = if CONFIG.storage_name.eq("drive") { + let file_path = + object_store::path::Path::from_absolute_path(file_path).unwrap(); + PartitionedFile::new(file_path, file.file_size) + } else { + PartitionedFile::new(file_path, file.file_size) + }; + } + + partitioned_files[index].push(pf); + columns.into_iter().for_each(|col| { + column_statistics + .entry(col.name) + .and_modify(|x| { + if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) + { + *x = Some(stats.update(col_stats)); + } + }) + .or_insert_with(|| col.stats.as_ref().cloned()); + }); + count += num_rows; + } + let statistics = self + .schema + .fields() + .iter() + .map(|field| { + column_statistics + .get(field.name()) + .and_then(|stats| stats.as_ref()) + .and_then(|stats| stats.clone().min_max_as_scalar(field.data_type())) + .map(|(min, max)| datafusion::common::ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Exact(max), + min_value: Precision::Exact(min), + distinct_count: Precision::Absent, + }) + .unwrap_or_default() + }) + .collect(); + + let statistics = datafusion::common::Statistics { + num_rows: Precision::Exact(count as usize), + total_byte_size: Precision::Absent, + column_statistics: statistics, + }; + + (partitioned_files, statistics) + } + + #[allow(clippy::too_many_arguments)] + async fn get_cache_exectuion_plan( + &self, + cache_manager: &LocalCacheManager, + manifest_files: &mut Vec, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option, + ) -> Result>, DataFusionError> { + let (cached, remainder) = cache_manager + .partition_on_cached(&self.stream, manifest_files.clone(), |file: &File| { + &file.file_path + }) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + // Assign remaining entries back to manifest list + // This is to be used for remote query + *manifest_files = remainder; + + let cached = cached + .into_iter() + .map(|(mut file, cache_path)| { + let cache_path = object_store::path::Path::from_absolute_path(cache_path).unwrap(); + file.file_path = cache_path.to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = self.partitioned_files(cached); + let plan = self + .create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; + + Ok(Some(plan)) + } + + #[allow(clippy::too_many_arguments)] + async fn get_hottier_exectuion_plan( + &self, + hot_tier_manager: &HotTierManager, + manifest_files: &mut Vec, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option, + ) -> Result>, DataFusionError> { + let (hot_tier_files, remainder) = hot_tier_manager + .get_hot_tier_manifest_files(&self.stream, manifest_files.clone()) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + // Assign remaining entries back to manifest list + // This is to be used for remote query + *manifest_files = remainder; + + let hot_tier_files = hot_tier_files + .into_iter() + .map(|mut file| { + let path = CONFIG + .parseable + .hot_tier_storage_path + .as_ref() + .unwrap() + .join(&file.file_path); + file.file_path = path.to_str().unwrap().to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files); + let plan = self + .create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, statistics, - projection: projection.cloned(), + projection, + filters, limit, - output_ordering: vec![vec![sort_expr]], - table_partition_cols: Vec::new(), + state, + time_partition.clone(), + ) + .await?; + + Ok(Some(plan)) + } + + #[allow(clippy::too_many_arguments)] + async fn create_parquet_physical_plan( + &self, + object_store_url: ObjectStoreUrl, + partitions: Vec>, + statistics: Statistics, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option, + ) -> Result, DataFusionError> { + let filters = if let Some(expr) = conjunction(filters.to_vec()) { + let table_df_schema = self.schema.as_ref().clone().to_dfschema()?; + let filters = create_physical_expr(&expr, &table_df_schema, state.execution_props())?; + Some(filters) + } else { + None + }; + + let sort_expr = PhysicalSortExpr { + expr: if let Some(time_partition) = time_partition { + physical_plan::expressions::col(&time_partition, &self.schema)? + } else { + physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)? }, - filters.as_ref(), - ) - .await?; - Ok(plan) + options: SortOptions { + descending: true, + nulls_first: true, + }, + }; + let file_format = ParquetFormat::default().with_enable_pruning(true); + + // create the execution plan + let plan = file_format + .create_physical_plan( + state.as_any().downcast_ref::().unwrap(), // Remove this when ParquetFormat catches up + FileScanConfig { + object_store_url, + file_schema: self.schema.clone(), + file_groups: partitions, + statistics, + projection: projection.cloned(), + limit, + output_ordering: vec![vec![sort_expr]], + table_partition_cols: Vec::new(), + }, + filters.as_ref(), + ) + .await?; + Ok(plan) + } + + fn final_plan( + &self, + execution_plans: Vec>>, + projection: Option<&Vec>, + ) -> Result, DataFusionError> { + let mut execution_plans = execution_plans.into_iter().flatten().collect_vec(); + + let exec: Arc = if execution_plans.is_empty() { + let schema = match projection { + Some(projection) => Arc::new(self.schema.project(projection)?), + None => self.schema.clone(), + }; + Arc::new(EmptyExec::new(schema)) + } else if execution_plans.len() == 1 { + execution_plans.pop().unwrap() + } else { + Arc::new(UnionExec::new(execution_plans)) + }; + Ok(exec) + } + + fn reversed_mem_table( + &self, + mut records: Vec, + ) -> Result { + records + .iter_mut() + .rev() + .for_each(|batch| *batch = crate::utils::arrow::reverse_reader::reverse(batch)); + MemTable::try_new(self.schema.clone(), vec![records]) + } } async fn collect_from_snapshot( @@ -214,88 +459,6 @@ async fn collect_from_snapshot( Ok(manifest_files) } -fn partitioned_files( - manifest_files: Vec, - table_schema: &Schema, -) -> (Vec>, datafusion::common::Statistics) { - let target_partition = num_cpus::get(); - let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); - let mut column_statistics = HashMap::>::new(); - let mut count = 0; - for (index, file) in manifest_files - .into_iter() - .enumerate() - .map(|(x, y)| (x % target_partition, y)) - { - let catalog::manifest::File { - file_path, - num_rows, - columns, - .. - } = file; - - // object_store::path::Path doesn't automatically deal with Windows path separators - // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem - // before sending the file path to PartitionedFile - // the github issue- https://github.com/parseablehq/parseable/issues/824 - // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution - // TODO: figure out an elegant solution to this - let pf; - - #[cfg(unix)] - { - pf = PartitionedFile::new(file_path, file.file_size); - } - #[cfg(windows)] - { - pf = if CONFIG.storage_name.eq("drive") { - let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap(); - PartitionedFile::new(file_path, file.file_size) - } else { - PartitionedFile::new(file_path, file.file_size) - }; - } - - partitioned_files[index].push(pf); - columns.into_iter().for_each(|col| { - column_statistics - .entry(col.name) - .and_modify(|x| { - if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) { - *x = Some(stats.update(col_stats)); - } - }) - .or_insert_with(|| col.stats.as_ref().cloned()); - }); - count += num_rows; - } - let statistics = table_schema - .fields() - .iter() - .map(|field| { - column_statistics - .get(field.name()) - .and_then(|stats| stats.as_ref()) - .and_then(|stats| stats.clone().min_max_as_scalar(field.data_type())) - .map(|(min, max)| datafusion::common::ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Exact(max), - min_value: Precision::Exact(min), - distinct_count: Precision::Absent, - }) - .unwrap_or_default() - }) - .collect(); - - let statistics = datafusion::common::Statistics { - num_rows: Precision::Exact(count as usize), - total_byte_size: Precision::Absent, - column_statistics: statistics, - }; - - (partitioned_files, statistics) -} - #[async_trait::async_trait] impl TableProvider for StandardTableProvider { fn as_any(&self) -> &dyn std::any::Any { @@ -342,7 +505,7 @@ impl TableProvider for StandardTableProvider { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { - let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; + let reversed_mem_table = self.reversed_mem_table(records)?; memory_exec = Some( reversed_mem_table .scan(state, projection, filters, limit) @@ -353,21 +516,20 @@ impl TableProvider for StandardTableProvider { let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); if CONFIG.parseable.mode == Mode::Query { let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); - let obs = glob_storage + if let Ok(obs) = glob_storage .get_objects( Some(&path), Box::new(|file_name| file_name.ends_with("stream.json")), ) - .await; - if let Ok(obs) = obs { + .await + { for ob in obs { if let Ok(object_store_format) = serde_json::from_slice::(&ob) { - let snapshot = object_store_format.snapshot; - for manifest in snapshot.manifest_list { - merged_snapshot.manifest_list.push(manifest); - } + merged_snapshot + .manifest_list + .extend(object_store_format.snapshot.manifest_list); } } } @@ -381,23 +543,20 @@ impl TableProvider for StandardTableProvider { let listing_time_fiters = return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); - listing_exec = if let Some(listing_time_filter) = listing_time_fiters { - legacy_listing_table( - self.stream.clone(), - glob_storage.clone(), - object_store.clone(), - &listing_time_filter, - self.schema.clone(), - state, - projection, - filters, - limit, - time_partition.clone(), - ) - .await? - } else { - None - }; + if let Some(listing_time_filter) = listing_time_fiters { + listing_exec = self + .legacy_listing_table( + glob_storage.clone(), + object_store.clone(), + &listing_time_filter, + state, + projection, + filters, + limit, + time_partition.clone(), + ) + .await? + } } let mut manifest_files = collect_from_snapshot( @@ -410,37 +569,15 @@ impl TableProvider for StandardTableProvider { .await?; if manifest_files.is_empty() { - return final_plan( - vec![listing_exec, memory_exec], - projection, - self.schema.clone(), - ); + return self.final_plan(vec![listing_exec, memory_exec], projection); } // Based on entries in the manifest files, find them in the cache and create a physical plan. if let Some(cache_manager) = LocalCacheManager::global() { - cache_exec = get_cache_exectuion_plan( - cache_manager, - &self.stream, - &mut manifest_files, - self.schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - } - - // Hot tier data fetch - if let Some(hot_tier_manager) = HotTierManager::global() { - if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { - hot_tier_exec = get_hottier_exectuion_plan( - hot_tier_manager, - &self.stream, + cache_exec = self + .get_cache_exectuion_plan( + cache_manager, &mut manifest_files, - self.schema.clone(), projection, filters, limit, @@ -448,32 +585,47 @@ impl TableProvider for StandardTableProvider { time_partition.clone(), ) .await?; + } + + // Hot tier data fetch + if let Some(hot_tier_manager) = HotTierManager::global() { + if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { + hot_tier_exec = self + .get_hottier_exectuion_plan( + hot_tier_manager, + &mut manifest_files, + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; } } if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); - return final_plan( + return self.final_plan( vec![listing_exec, memory_exec, cache_exec, hot_tier_exec], projection, - self.schema.clone(), ); } - let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema); - let remote_exec = create_parquet_physical_plan( - ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(), - partitioned_files, - statistics, - self.schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; + let (partitioned_files, statistics) = self.partitioned_files(manifest_files); + let remote_exec = self + .create_parquet_physical_plan( + ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(), + partitioned_files, + statistics, + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; - Ok(final_plan( + Ok(self.final_plan( vec![ listing_exec, memory_exec, @@ -482,7 +634,6 @@ impl TableProvider for StandardTableProvider { Some(remote_exec), ], projection, - self.schema.clone(), )?) } @@ -511,170 +662,6 @@ impl TableProvider for StandardTableProvider { } } -#[allow(clippy::too_many_arguments)] -async fn get_cache_exectuion_plan( - cache_manager: &LocalCacheManager, - stream: &str, - manifest_files: &mut Vec, - schema: Arc, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &dyn Session, - time_partition: Option, -) -> Result>, DataFusionError> { - let (cached, remainder) = cache_manager - .partition_on_cached(stream, manifest_files.clone(), |file: &File| { - &file.file_path - }) - .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - - // Assign remaining entries back to manifest list - // This is to be used for remote query - *manifest_files = remainder; - - let cached = cached - .into_iter() - .map(|(mut file, cache_path)| { - let cache_path = object_store::path::Path::from_absolute_path(cache_path).unwrap(); - file.file_path = cache_path.to_string(); - file - }) - .collect(); - - let (partitioned_files, statistics) = partitioned_files(cached, &schema); - let plan = create_parquet_physical_plan( - ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, - statistics, - schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - - Ok(Some(plan)) -} - -#[allow(clippy::too_many_arguments)] -async fn get_hottier_exectuion_plan( - hot_tier_manager: &HotTierManager, - stream: &str, - manifest_files: &mut Vec, - schema: Arc, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &dyn Session, - time_partition: Option, -) -> Result>, DataFusionError> { - let (hot_tier_files, remainder) = hot_tier_manager - .get_hot_tier_manifest_files(stream, manifest_files.clone()) - .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - // Assign remaining entries back to manifest list - // This is to be used for remote query - *manifest_files = remainder; - - let hot_tier_files = hot_tier_files - .into_iter() - .map(|mut file| { - let path = CONFIG - .parseable - .hot_tier_storage_path - .as_ref() - .unwrap() - .join(&file.file_path); - file.file_path = path.to_str().unwrap().to_string(); - file - }) - .collect(); - - let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema); - let plan = create_parquet_physical_plan( - ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, - statistics, - schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - - Ok(Some(plan)) -} - -#[allow(clippy::too_many_arguments)] -async fn legacy_listing_table( - stream: String, - glob_storage: Arc, - object_store: Arc, - time_filters: &[PartialTimeFilter], - schema: Arc, - state: &dyn Session, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - time_partition: Option, -) -> Result>, DataFusionError> { - let remote_table = ListingTableBuilder::new(stream) - .populate_via_listing(glob_storage.clone(), object_store, time_filters) - .and_then(|builder| async { - let table = builder.build( - schema.clone(), - |x| glob_storage.query_prefixes(x), - time_partition, - )?; - let res = match table { - Some(table) => Some(table.scan(state, projection, filters, limit).await?), - _ => None, - }; - Ok(res) - }) - .await?; - - Ok(remote_table) -} - -fn final_plan( - execution_plans: Vec>>, - projection: Option<&Vec>, - schema: Arc, -) -> Result, DataFusionError> { - let mut execution_plans = execution_plans.into_iter().flatten().collect_vec(); - - let exec: Arc = if execution_plans.is_empty() { - let schema = match projection { - Some(projection) => Arc::new(schema.project(projection)?), - None => schema, - }; - Arc::new(EmptyExec::new(schema)) - } else if execution_plans.len() == 1 { - execution_plans.pop().unwrap() - } else { - Arc::new(UnionExec::new(execution_plans)) - }; - Ok(exec) -} - -fn reversed_mem_table( - mut records: Vec, - schema: Arc, -) -> Result { - records[..].reverse(); - records - .iter_mut() - .for_each(|batch| *batch = crate::utils::arrow::reverse_reader::reverse(batch)); - MemTable::try_new(schema, vec![records]) -} - #[derive(Debug, Clone)] pub enum PartialTimeFilter { Low(Bound), @@ -741,19 +728,13 @@ fn is_overlapping_query( return true; }; - for filter in time_filters { - match filter { - PartialTimeFilter::Low(Bound::Excluded(time)) - | PartialTimeFilter::Low(Bound::Included(time)) => { - if time < &first_entry_lower_bound.naive_utc() { - return true; - } - } - _ => {} + time_filters.iter().any(|filter| match filter { + PartialTimeFilter::Low(Bound::Excluded(time)) + | PartialTimeFilter::Low(Bound::Included(time)) => { + time < &first_entry_lower_bound.naive_utc() } - } - - false + _ => false, + }) } /// This function will accept time filters provided to the query and will split them @@ -955,22 +936,15 @@ trait ManifestExt: ManifestFile { } fn can_be_pruned(&self, partial_filter: &Expr) -> bool { - fn extract_op_scalar(expr: &Expr) -> Option<(Operator, &ScalarValue)> { - let Expr::BinaryExpr(expr) = expr else { - return None; - }; - let Expr::Literal(value) = &*expr.right else { - return None; - }; - /* `BinaryExp` doesn't implement `Copy` */ - Some((expr.op, value)) - } - let Some(col) = self.find_matching_column(partial_filter) else { return false; }; - let Some((op, value)) = extract_op_scalar(partial_filter) else { + let Expr::BinaryExpr(expr) = partial_filter else { + return false; + }; + + let Expr::Literal(value) = expr.right.as_ref() else { return false; }; @@ -982,7 +956,7 @@ trait ManifestExt: ManifestFile { return false; }; - !satisfy_constraints(value, op, stats).unwrap_or(true) + !satisfy_constraints(value, expr.op, stats).unwrap_or(true) } } diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index d50e2d901..5875ec093 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -163,7 +163,7 @@ impl ObjectStorageProvider for AzureBlobConfig { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn construct_client(&self) -> Arc { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index b3d3e09cd..78b8114c0 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -67,7 +67,7 @@ impl ObjectStorageProvider for FSConfig { RuntimeConfig::new() } - fn get_object_store(&self) -> Arc { + fn construct_client(&self) -> Arc { Arc::new(LocalFS::new(self.root.clone())) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 78f51685d..8d83c297a 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -46,6 +46,7 @@ use bytes::Bytes; use chrono::Local; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; use itertools::Itertools; +use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; @@ -60,7 +61,12 @@ use std::{ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeConfig; - fn get_object_store(&self) -> Arc; + fn construct_client(&self) -> Arc; + fn get_object_store(&self) -> Arc { + static STORE: OnceCell> = OnceCell::new(); + + STORE.get_or_init(|| self.construct_client()).clone() + } fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index c04bab9af..8133ac44f 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -298,7 +298,7 @@ impl ObjectStorageProvider for S3Config { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn construct_client(&self) -> Arc { let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit