-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement predicate pushdown for parquet reader #349
base: main
Are you sure you want to change the base?
Changes from all commits
a507c76
baf23f2
66f6742
e19c3c6
71c631d
e932c9c
5db1ebc
5279ff5
f2c15e3
7842372
c71ec9d
1a87abc
94e1baa
440242c
059b48c
3a360ab
11763bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ use parquet::arrow::arrow_reader::{ | |
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; | ||
|
||
use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; | ||
use crate::engine::arrow_expression::expression_to_row_filter; | ||
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; | ||
use crate::engine::default::executor::TaskExecutor; | ||
use crate::schema::SchemaRef; | ||
|
@@ -47,7 +48,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> { | |
&self, | ||
files: &[FileMeta], | ||
physical_schema: SchemaRef, | ||
_predicate: Option<Expression>, | ||
predicate: Option<Expression>, | ||
) -> DeltaResult<FileDataReadResultIterator> { | ||
if files.is_empty() { | ||
return Ok(Box::new(std::iter::empty())); | ||
|
@@ -62,11 +63,16 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> { | |
// -> parse to parquet | ||
// SAFETY: we did is_empty check above, this is ok. | ||
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() { | ||
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())), | ||
"http" | "https" => Box::new(PresignedUrlOpener::new( | ||
1024, | ||
physical_schema.clone(), | ||
predicate, | ||
)), | ||
_ => Box::new(ParquetOpener::new( | ||
1024, | ||
physical_schema.clone(), | ||
self.store.clone(), | ||
predicate, | ||
)), | ||
}; | ||
FileStream::new_async_read_iterator( | ||
|
@@ -86,19 +92,22 @@ struct ParquetOpener { | |
limit: Option<usize>, | ||
table_schema: SchemaRef, | ||
store: Arc<DynObjectStore>, | ||
predicate: Option<Expression>, | ||
} | ||
|
||
impl ParquetOpener { | ||
pub(crate) fn new( | ||
batch_size: usize, | ||
table_schema: SchemaRef, | ||
store: Arc<DynObjectStore>, | ||
predicate: Option<Expression>, | ||
) -> Self { | ||
Self { | ||
batch_size, | ||
table_schema, | ||
limit: None, | ||
store, | ||
predicate, | ||
} | ||
} | ||
} | ||
|
@@ -112,7 +121,7 @@ impl FileOpener for ParquetOpener { | |
// let projection = self.projection.clone(); | ||
let table_schema = self.table_schema.clone(); | ||
let limit = self.limit; | ||
|
||
let predicate = self.predicate.clone(); | ||
Ok(Box::pin(async move { | ||
// TODO avoid IO by converting passed file meta to ObjectMeta | ||
let meta = store.head(&path).await?; | ||
|
@@ -124,6 +133,17 @@ impl FileOpener for ParquetOpener { | |
let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index); | ||
let mut builder = | ||
ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?; | ||
if let Some(predicate) = predicate { | ||
let parquet_schema = metadata.schema(); | ||
let parquet_physical_schema = metadata.parquet_schema(); | ||
let row_filter = expression_to_row_filter( | ||
predicate, | ||
&table_schema, | ||
parquet_schema, | ||
parquet_physical_schema, | ||
)?; | ||
builder = builder.with_row_filter(row_filter); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this will work as well as we wish it would... In my experience, this kind of row-level pushdown doesn't consistently help performance -- even with the kind of lazy materialization arrow-rust brags about (it won't fetch column chunks until proving at least one row is needed). The reason is: Every row pays the cost of evaluating the filter, while any I/O reduction is only partial at best. We still have to fetch the columns the predicate touches, so any I/O savings come by not fetching payload columns. But that only works if the filter eliminated ALL rows from the row group. And if row groups can be skipped, then stats-based row group skipping can often do it much more cheaply (no extra I/O at all). Meanwhile, in cases that don't see any I/O reduction, pushing down the filtering just shifts complexity from the query engine to the file scanner. And that's usually a net loss because the output of the scan is likely consumed in a pipelined single pass either way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: There are absolutely cases where row-level filter pushdown is a performance win... but there are too many cases where it doesn't help or even hurts performance instead. And it's data-dependent, so hard to predict how any one query will be affected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks ryan planning on pausing this work for now :) |
||
} | ||
if let Some(mask) = generate_mask( | ||
&table_schema, | ||
parquet_schema, | ||
|
@@ -156,15 +176,17 @@ struct PresignedUrlOpener { | |
limit: Option<usize>, | ||
table_schema: SchemaRef, | ||
client: reqwest::Client, | ||
predicate: Option<Expression>, | ||
} | ||
|
||
impl PresignedUrlOpener { | ||
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self { | ||
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self { | ||
Self { | ||
batch_size, | ||
table_schema: schema, | ||
limit: None, | ||
client: reqwest::Client::new(), | ||
predicate, | ||
} | ||
} | ||
} | ||
|
@@ -175,6 +197,7 @@ impl FileOpener for PresignedUrlOpener { | |
let table_schema = self.table_schema.clone(); | ||
let limit = self.limit; | ||
let client = self.client.clone(); // uses Arc internally according to reqwest docs | ||
let predicate = self.predicate.clone(); | ||
|
||
Ok(Box::pin(async move { | ||
// fetch the file from the interweb | ||
|
@@ -196,6 +219,16 @@ impl FileOpener for PresignedUrlOpener { | |
builder = builder.with_projection(mask) | ||
} | ||
|
||
if let Some(predicate) = predicate { | ||
let parquet_physical_schema = metadata.parquet_schema(); | ||
let row_filter = expression_to_row_filter( | ||
predicate, | ||
&table_schema, | ||
parquet_schema, | ||
parquet_physical_schema, | ||
)?; | ||
builder = builder.with_row_filter(row_filter); | ||
} | ||
if let Some(limit) = limit { | ||
builder = builder.with_limit(limit) | ||
} | ||
|
@@ -216,14 +249,17 @@ impl FileOpener for PresignedUrlOpener { | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::ops::Not; | ||
use std::path::PathBuf; | ||
|
||
use arrow_array::RecordBatch; | ||
use object_store::{local::LocalFileSystem, ObjectStore}; | ||
|
||
use crate::actions::{Metadata, Protocol}; | ||
use crate::engine::arrow_data::ArrowEngineData; | ||
use crate::engine::default::executor::tokio::TokioBackgroundExecutor; | ||
use crate::EngineData; | ||
use Expression as Expr; | ||
|
||
use itertools::Itertools; | ||
|
||
|
@@ -237,13 +273,10 @@ mod tests { | |
.map(Into::into) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_parquet_files() { | ||
async fn get_record_batches(path: &str, predicate: Option<Expression>) -> Vec<RecordBatch> { | ||
let store = Arc::new(LocalFileSystem::new()); | ||
|
||
let path = std::fs::canonicalize(PathBuf::from( | ||
"./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet" | ||
)).unwrap(); | ||
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap(); | ||
let url = url::Url::from_file_path(path).unwrap(); | ||
let location = Path::from(url.path()); | ||
let meta = store.head(&location).await.unwrap(); | ||
|
@@ -262,14 +295,80 @@ mod tests { | |
}]; | ||
|
||
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); | ||
let data: Vec<RecordBatch> = handler | ||
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None) | ||
handler | ||
.read_parquet_files( | ||
files, | ||
Arc::new(physical_schema.try_into().unwrap()), | ||
predicate, | ||
) | ||
.unwrap() | ||
.map(into_record_batch) | ||
.try_collect() | ||
.unwrap(); | ||
.unwrap() | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_parquet_files_with_empty_output() { | ||
let predicate = Some(Expr::lt(Expr::column("value"), Expr::literal(0))); | ||
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"; | ||
let data = get_record_batches(path, predicate).await; | ||
assert_eq!(data.len(), 0); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_parquet_files_filter_data() { | ||
let cases = [ | ||
( | ||
5, | ||
Some(Expr::gt_eq(Expr::column("value"), Expr::literal(5))), | ||
), | ||
(1, Some(Expr::gt(Expr::column("value"), Expr::literal(8)))), | ||
(10, None), | ||
]; | ||
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"; | ||
for (num_rows, predicate) in cases { | ||
let data = get_record_batches(path, predicate).await; | ||
assert_eq!(data.len(), 1); | ||
assert_eq!(data[0].num_rows(), num_rows); | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_parquet_protocol_metadata_filter() { | ||
let predicate = Some(Expr::or( | ||
Expr::not(Expr::is_null(Expr::column("metaData.id"))), | ||
Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))), | ||
)); | ||
|
||
let path = | ||
"./tests/data/app-txn-checkpoint/_delta_log/00000000000000000001.checkpoint.parquet"; | ||
let data_filtered = get_record_batches(path, predicate).await; | ||
let data = get_record_batches(path, None).await; | ||
|
||
let mut metadata_filtered: Vec<Metadata> = vec![]; | ||
let mut metadata_expected: Vec<Metadata> = vec![]; | ||
let mut protocol_filtered: Vec<Protocol> = vec![]; | ||
let mut protocol_expected: Vec<Protocol> = vec![]; | ||
|
||
for batch in data_filtered.into_iter().map(Into::<ArrowEngineData>::into) { | ||
if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() { | ||
metadata_filtered.push(metadata); | ||
} else if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() { | ||
protocol_filtered.push(protocol); | ||
} else { | ||
panic!("The filtered data must only have metadata or protocol entries"); | ||
} | ||
} | ||
for batch in data.into_iter().map(Into::<ArrowEngineData>::into) { | ||
if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() { | ||
metadata_expected.push(metadata); | ||
} | ||
if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() { | ||
protocol_expected.push(protocol); | ||
} | ||
} | ||
|
||
assert_eq!(data.len(), 1); | ||
assert_eq!(data[0].num_rows(), 10); | ||
assert_eq!(metadata_filtered, metadata_expected); | ||
assert_eq!(protocol_expected, protocol_expected); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe if the 'to_row_filter' above takes ref we don't need to clone?