diff --git a/kernel/src/engine/arrow_expression.rs b/kernel/src/engine/arrow_expression.rs index 33bf4c47d..5c224e185 100644 --- a/kernel/src/engine/arrow_expression.rs +++ b/kernel/src/engine/arrow_expression.rs @@ -1,4 +1,5 @@ //! Expression handling based on arrow-rs compute kernels. +use std::collections::HashSet; use std::sync::Arc; use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene}; @@ -19,18 +20,77 @@ use arrow_schema::{ }; use arrow_select::concat::concat; use itertools::Itertools; +use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter}; +use parquet::arrow::ProjectionMask; use super::arrow_conversion::LIST_ARRAY_ROOT; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::arrow_utils::ensure_data_types; use crate::engine::arrow_utils::prim_array_cmp; +use crate::engine::arrow_utils::{ensure_data_types, generate_mask, get_requested_indices}; use crate::error::{DeltaResult, Error}; use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator}; use crate::schema::{DataType, PrimitiveType, SchemaRef}; use crate::{EngineData, ExpressionEvaluator, ExpressionHandler}; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use parquet::schema::types::SchemaDescriptor; // TODO leverage scalars / Datum +pub fn expression_to_row_filter( + predicate: Expression, + requested_schema: &SchemaRef, + parquet_schema: &ArrowSchemaRef, + parquet_physical_schema: &SchemaDescriptor, +) -> DeltaResult { + let cols = get_columns_from_expression(&predicate); + let expr_schema = requested_schema.project(&cols)?; + let (indices, _) = get_requested_indices(&expr_schema, parquet_schema)?; + let projection_mask = generate_mask( + &expr_schema, + parquet_schema, + parquet_physical_schema, + &indices, + ) + .unwrap_or(ProjectionMask::all()); + let arrow_predicate = ArrowPredicateFn::new(projection_mask, move |batch| { + downcast_to_bool( + &evaluate_expression(&predicate, &batch, None) + .map_err(|err| ArrowError::ExternalError(Box::new(err)))?, + ) + .map_err(|err| ArrowError::ExternalError(Box::new(err))) + .cloned() + }); + Ok(RowFilter::new(vec![Box::new(arrow_predicate)])) +} + +pub fn get_columns_from_expression(expr: &Expression) -> Vec<&str> { + fn get_columns_from_expression_impl<'a>(expr: &'a Expression, out: &mut HashSet<&'a str>) { + match expr { + Expression::Column(col_name) => { + let root_name = col_name.split('.').next().unwrap_or(col_name); + out.insert(root_name); + } + Expression::Struct(fields) => fields + .iter() + .for_each(|expr| get_columns_from_expression_impl(expr, out)), + Expression::BinaryOperation { op: _, left, right } => { + get_columns_from_expression_impl(left, out); + get_columns_from_expression_impl(right, out); + } + Expression::UnaryOperation { op: _, expr } => { + get_columns_from_expression_impl(expr, out) + } + Expression::VariadicOperation { op: _, exprs } => exprs + .iter() + .for_each(|expr| get_columns_from_expression_impl(expr, out)), + Expression::Literal(_) => (), + } + } + let mut out = HashSet::new(); + get_columns_from_expression_impl(expr, &mut out); + out.into_iter().collect_vec() +} + fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> { arr.as_any() .downcast_ref::() diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 46bff22cb..6c15b4667 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -12,6 +12,7 @@ use parquet::arrow::arrow_reader::{ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; +use crate::engine::arrow_expression::expression_to_row_filter; use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::engine::default::executor::TaskExecutor; use crate::schema::SchemaRef; @@ -47,7 +48,7 @@ impl ParquetHandler for DefaultParquetHandler { &self, files: &[FileMeta], physical_schema: SchemaRef, - _predicate: Option, + predicate: Option, ) -> DeltaResult { if files.is_empty() { return Ok(Box::new(std::iter::empty())); @@ -62,11 +63,16 @@ impl ParquetHandler for DefaultParquetHandler { // -> parse to parquet // SAFETY: we did is_empty check above, this is ok. let file_opener: Box = match files[0].location.scheme() { - "http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())), + "http" | "https" => Box::new(PresignedUrlOpener::new( + 1024, + physical_schema.clone(), + predicate, + )), _ => Box::new(ParquetOpener::new( 1024, physical_schema.clone(), self.store.clone(), + predicate, )), }; FileStream::new_async_read_iterator( @@ -86,6 +92,7 @@ struct ParquetOpener { limit: Option, table_schema: SchemaRef, store: Arc, + predicate: Option, } impl ParquetOpener { @@ -93,12 +100,14 @@ impl ParquetOpener { batch_size: usize, table_schema: SchemaRef, store: Arc, + predicate: Option, ) -> Self { Self { batch_size, table_schema, limit: None, store, + predicate, } } } @@ -112,7 +121,7 @@ impl FileOpener for ParquetOpener { // let projection = self.projection.clone(); let table_schema = self.table_schema.clone(); let limit = self.limit; - + let predicate = self.predicate.clone(); Ok(Box::pin(async move { // TODO avoid IO by converting passed file meta to ObjectMeta let meta = store.head(&path).await?; @@ -124,6 +133,17 @@ impl FileOpener for ParquetOpener { let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); let mut builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?; + if let Some(predicate) = predicate { + let parquet_schema = metadata.schema(); + let parquet_physical_schema = metadata.parquet_schema(); + let row_filter = expression_to_row_filter( + predicate, + &table_schema, + parquet_schema, + parquet_physical_schema, + )?; + builder = builder.with_row_filter(row_filter); + } if let Some(mask) = generate_mask( &table_schema, parquet_schema, @@ -156,15 +176,17 @@ struct PresignedUrlOpener { limit: Option, table_schema: SchemaRef, client: reqwest::Client, + predicate: Option, } impl PresignedUrlOpener { - pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self { + pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option) -> Self { Self { batch_size, table_schema: schema, limit: None, client: reqwest::Client::new(), + predicate, } } } @@ -175,6 +197,7 @@ impl FileOpener for PresignedUrlOpener { let table_schema = self.table_schema.clone(); let limit = self.limit; let client = self.client.clone(); // uses Arc internally according to reqwest docs + let predicate = self.predicate.clone(); Ok(Box::pin(async move { // fetch the file from the interweb @@ -196,6 +219,16 @@ impl FileOpener for PresignedUrlOpener { builder = builder.with_projection(mask) } + if let Some(predicate) = predicate { + let parquet_physical_schema = metadata.parquet_schema(); + let row_filter = expression_to_row_filter( + predicate, + &table_schema, + parquet_schema, + parquet_physical_schema, + )?; + builder = builder.with_row_filter(row_filter); + } if let Some(limit) = limit { builder = builder.with_limit(limit) } @@ -216,14 +249,17 @@ impl FileOpener for PresignedUrlOpener { #[cfg(test)] mod tests { + use std::ops::Not; use std::path::PathBuf; use arrow_array::RecordBatch; use object_store::{local::LocalFileSystem, ObjectStore}; + use crate::actions::{Metadata, Protocol}; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::EngineData; + use Expression as Expr; use itertools::Itertools; @@ -237,13 +273,10 @@ mod tests { .map(Into::into) } - #[tokio::test] - async fn test_read_parquet_files() { + async fn get_record_batches(path: &str, predicate: Option) -> Vec { let store = Arc::new(LocalFileSystem::new()); - let path = std::fs::canonicalize(PathBuf::from( - "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet" - )).unwrap(); + let path = std::fs::canonicalize(PathBuf::from(path)).unwrap(); let url = url::Url::from_file_path(path).unwrap(); let location = Path::from(url.path()); let meta = store.head(&location).await.unwrap(); @@ -262,14 +295,80 @@ mod tests { }]; let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - let data: Vec = handler - .read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None) + handler + .read_parquet_files( + files, + Arc::new(physical_schema.try_into().unwrap()), + predicate, + ) .unwrap() .map(into_record_batch) .try_collect() - .unwrap(); + .unwrap() + } + + #[tokio::test] + async fn test_read_parquet_files_with_empty_output() { + let predicate = Some(Expr::lt(Expr::column("value"), Expr::literal(0))); + let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"; + let data = get_record_batches(path, predicate).await; + assert_eq!(data.len(), 0); + } + + #[tokio::test] + async fn test_read_parquet_files_filter_data() { + let cases = [ + ( + 5, + Some(Expr::gt_eq(Expr::column("value"), Expr::literal(5))), + ), + (1, Some(Expr::gt(Expr::column("value"), Expr::literal(8)))), + (10, None), + ]; + let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"; + for (num_rows, predicate) in cases { + let data = get_record_batches(path, predicate).await; + assert_eq!(data.len(), 1); + assert_eq!(data[0].num_rows(), num_rows); + } + } + + #[tokio::test] + async fn test_parquet_protocol_metadata_filter() { + let predicate = Some(Expr::or( + Expr::not(Expr::is_null(Expr::column("metaData.id"))), + Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))), + )); + + let path = + "./tests/data/app-txn-checkpoint/_delta_log/00000000000000000001.checkpoint.parquet"; + let data_filtered = get_record_batches(path, predicate).await; + let data = get_record_batches(path, None).await; + + let mut metadata_filtered: Vec = vec![]; + let mut metadata_expected: Vec = vec![]; + let mut protocol_filtered: Vec = vec![]; + let mut protocol_expected: Vec = vec![]; + + for batch in data_filtered.into_iter().map(Into::::into) { + if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() { + metadata_filtered.push(metadata); + } else if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() { + protocol_filtered.push(protocol); + } else { + panic!("The filtered data must only have metadata or protocol entries"); + } + } + for batch in data.into_iter().map(Into::::into) { + if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() { + metadata_expected.push(metadata); + } + if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() { + protocol_expected.push(protocol); + } + } - assert_eq!(data.len(), 1); - assert_eq!(data[0].num_rows(), 10); + assert_eq!(metadata_filtered, metadata_expected); + assert_eq!(protocol_expected, protocol_expected); } } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 860a490e1..f22d03927 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -5,13 +5,18 @@ use tracing::debug; use url::Url; use crate::engine::arrow_data::ArrowEngineData; +use crate::engine::arrow_expression::expression_to_row_filter; use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::schema::SchemaRef; use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler}; pub(crate) struct SyncParquetHandler; -fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult { +fn try_create_from_parquet( + schema: SchemaRef, + location: Url, + predicate: Option, +) -> DeltaResult { let file = File::open( location .to_file_path() @@ -25,6 +30,13 @@ fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult = files.iter().map(|file| file.location.clone()).collect(); Ok(Box::new(locations.into_iter().map(move |location| { - try_create_from_parquet(schema.clone(), location).map(|d| Box::new(d) as _) + try_create_from_parquet(schema.clone(), location, predicate.clone()) + .map(|d| Box::new(d) as _) }))) } } diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 5a02bcefb..803e3afa1 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -414,6 +414,7 @@ fn list_log_files( mod tests { use super::*; + use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -422,19 +423,53 @@ mod tests { use object_store::path::Path; use object_store::ObjectStore; + use crate::actions::Format; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; use crate::schema::StructType; - #[test] - fn test_snapshot_read_metadata() { - let path = - std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + fn get_snapshot_from_path(path: &str, version: Option) -> Snapshot { + let path = std::fs::canonicalize(PathBuf::from(path)).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); - let engine = SyncEngine::new(); - let snapshot = Snapshot::try_new(url, &engine, Some(1)).unwrap(); + Snapshot::try_new(url, &engine, version).unwrap() + } + + #[test] + fn test_replay_protocol_metadata_filtering_predicate() { + let snapshot = get_snapshot_from_path("./tests/data/app-txn-checkpoint", None); + + let schema_string = r#"{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":true,"metadata":{}},{"name":"modified","type":"string","nullable":true,"metadata":{}}]}"#; + let expected_schema: StructType = serde_json::from_str(schema_string).unwrap(); + assert_eq!(snapshot.schema(), &expected_schema); + + let expected_protocol = Protocol { + min_reader_version: 1, + min_writer_version: 2, + reader_features: None, + writer_features: None, + }; + assert_eq!(snapshot.protocol(), &expected_protocol); + + let expected_metadata = Metadata { + id: "e7802058-f49c-4f0b-937f-82a3e42781a3".into(), + name: None, + description: None, + format: Format { + provider: "parquet".into(), + options: HashMap::new(), + }, + schema_string: schema_string.into(), + partition_columns: vec!["modified".into()], + created_time: Some(1713400874275), + configuration: HashMap::new(), + }; + assert_eq!(snapshot.metadata(), &expected_metadata); + } + #[test] + fn test_snapshot_read_metadata() { + let snapshot = get_snapshot_from_path("./tests/data/table-with-dv-small/", None); let expected = Protocol { min_reader_version: 3, @@ -451,12 +486,7 @@ mod tests { #[test] fn test_new_snapshot() { - let path = - std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); - let url = url::Url::from_directory_path(path).unwrap(); - - let engine = SyncEngine::new(); - let snapshot = Snapshot::try_new(url, &engine, None).unwrap(); + let snapshot = get_snapshot_from_path("./tests/data/table-with-dv-small/", None); let expected = Protocol { min_reader_version: 3, @@ -533,13 +563,8 @@ mod tests { #[test_log::test] fn test_read_table_with_checkpoint() { - let path = std::fs::canonicalize(PathBuf::from( - "./tests/data/with_checkpoint_no_last_checkpoint/", - )) - .unwrap(); - let location = url::Url::from_directory_path(path).unwrap(); - let engine = SyncEngine::new(); - let snapshot = Snapshot::try_new(location, &engine, None).unwrap(); + let snapshot = + get_snapshot_from_path("./tests/data/with_checkpoint_no_last_checkpoint/", None); assert_eq!(snapshot.log_segment.checkpoint_files.len(), 1); assert_eq!(