Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement predicate pushdown for parquet reader #349

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Expression handling based on arrow-rs compute kernels.
use std::collections::HashSet;
use std::sync::Arc;

use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene};
Expand All @@ -19,18 +20,77 @@ use arrow_schema::{
};
use arrow_select::concat::concat;
use itertools::Itertools;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::ProjectionMask;

use super::arrow_conversion::LIST_ARRAY_ROOT;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::ensure_data_types;
use crate::engine::arrow_utils::prim_array_cmp;
use crate::engine::arrow_utils::{ensure_data_types, generate_mask, get_requested_indices};
use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
use crate::schema::{DataType, PrimitiveType, SchemaRef};
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use parquet::schema::types::SchemaDescriptor;

// TODO leverage scalars / Datum

pub fn expression_to_row_filter(
predicate: Expression,
requested_schema: &SchemaRef,
parquet_schema: &ArrowSchemaRef,
parquet_physical_schema: &SchemaDescriptor,
) -> DeltaResult<RowFilter> {
let cols = get_columns_from_expression(&predicate);
let expr_schema = requested_schema.project(&cols)?;
let (indices, _) = get_requested_indices(&expr_schema, parquet_schema)?;
let projection_mask = generate_mask(
&expr_schema,
parquet_schema,
parquet_physical_schema,
&indices,
)
.unwrap_or(ProjectionMask::all());
let arrow_predicate = ArrowPredicateFn::new(projection_mask, move |batch| {
downcast_to_bool(
&evaluate_expression(&predicate, &batch, None)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?,
)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
.cloned()
});
Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
}

pub fn get_columns_from_expression(expr: &Expression) -> Vec<&str> {
fn get_columns_from_expression_impl<'a>(expr: &'a Expression, out: &mut HashSet<&'a str>) {
match expr {
Expression::Column(col_name) => {
let root_name = col_name.split('.').next().unwrap_or(col_name);
out.insert(root_name);
}
Expression::Struct(fields) => fields
.iter()
.for_each(|expr| get_columns_from_expression_impl(expr, out)),
Expression::BinaryOperation { op: _, left, right } => {
get_columns_from_expression_impl(left, out);
get_columns_from_expression_impl(right, out);
}
Expression::UnaryOperation { op: _, expr } => {
get_columns_from_expression_impl(expr, out)
}
Expression::VariadicOperation { op: _, exprs } => exprs
.iter()
.for_each(|expr| get_columns_from_expression_impl(expr, out)),
Expression::Literal(_) => (),
}
}
let mut out = HashSet::new();
get_columns_from_expression_impl(expr, &mut out);
out.into_iter().collect_vec()
}

fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> {
arr.as_any()
.downcast_ref::<BooleanArray>()
Expand Down
127 changes: 113 additions & 14 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use parquet::arrow::arrow_reader::{
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_expression::expression_to_row_filter;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::schema::SchemaRef;
Expand Down Expand Up @@ -47,7 +48,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -62,11 +63,16 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
self.store.clone(),
predicate,
)),
};
FileStream::new_async_read_iterator(
Expand All @@ -86,19 +92,22 @@ struct ParquetOpener {
limit: Option<usize>,
table_schema: SchemaRef,
store: Arc<DynObjectStore>,
predicate: Option<Expression>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
store: Arc<DynObjectStore>,
predicate: Option<Expression>,
) -> Self {
Self {
batch_size,
table_schema,
limit: None,
store,
predicate,
}
}
}
Expand All @@ -112,7 +121,7 @@ impl FileOpener for ParquetOpener {
// let projection = self.projection.clone();
let table_schema = self.table_schema.clone();
let limit = self.limit;

let predicate = self.predicate.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe if the 'to_row_filter' above takes ref we don't need to clone?

Ok(Box::pin(async move {
// TODO avoid IO by converting passed file meta to ObjectMeta
let meta = store.head(&path).await?;
Expand All @@ -124,6 +133,17 @@ impl FileOpener for ParquetOpener {
let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index);
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?;
if let Some(predicate) = predicate {
let parquet_schema = metadata.schema();
let parquet_physical_schema = metadata.parquet_schema();
let row_filter = expression_to_row_filter(
predicate,
&table_schema,
parquet_schema,
parquet_physical_schema,
)?;
builder = builder.with_row_filter(row_filter);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will work as well as we wish it would...

In my experience, this kind of row-level pushdown doesn't consistently help performance -- even with the kind of lazy materialization arrow-rust brags about (it won't fetch column chunks until proving at least one row is needed).

The reason is: Every row pays the cost of evaluating the filter, while any I/O reduction is only partial at best. We still have to fetch the columns the predicate touches, so any I/O savings come by not fetching payload columns. But that only works if the filter eliminated ALL rows from the row group. And if row groups can be skipped, then stats-based row group skipping can often do it much more cheaply (no extra I/O at all).

Meanwhile, in cases that don't see any I/O reduction, pushing down the filtering just shifts complexity from the query engine to the file scanner. And that's usually a net loss because the output of the scan is likely consumed in a pipelined single pass either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: There are absolutely cases where row-level filter pushdown is a performance win... but there are too many cases where it doesn't help or even hurts performance instead. And it's data-dependent, so hard to predict how any one query will be affected.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks ryan planning on pausing this work for now :)

}
if let Some(mask) = generate_mask(
&table_schema,
parquet_schema,
Expand Down Expand Up @@ -156,15 +176,17 @@ struct PresignedUrlOpener {
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
predicate: Option<Expression>,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
Self {
batch_size,
table_schema: schema,
limit: None,
client: reqwest::Client::new(),
predicate,
}
}
}
Expand All @@ -175,6 +197,7 @@ impl FileOpener for PresignedUrlOpener {
let table_schema = self.table_schema.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs
let predicate = self.predicate.clone();

Ok(Box::pin(async move {
// fetch the file from the interweb
Expand All @@ -196,6 +219,16 @@ impl FileOpener for PresignedUrlOpener {
builder = builder.with_projection(mask)
}

if let Some(predicate) = predicate {
let parquet_physical_schema = metadata.parquet_schema();
let row_filter = expression_to_row_filter(
predicate,
&table_schema,
parquet_schema,
parquet_physical_schema,
)?;
builder = builder.with_row_filter(row_filter);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand All @@ -216,14 +249,17 @@ impl FileOpener for PresignedUrlOpener {

#[cfg(test)]
mod tests {
use std::ops::Not;
use std::path::PathBuf;

use arrow_array::RecordBatch;
use object_store::{local::LocalFileSystem, ObjectStore};

use crate::actions::{Metadata, Protocol};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::EngineData;
use Expression as Expr;

use itertools::Itertools;

Expand All @@ -237,13 +273,10 @@ mod tests {
.map(Into::into)
}

#[tokio::test]
async fn test_read_parquet_files() {
async fn get_record_batches(path: &str, predicate: Option<Expression>) -> Vec<RecordBatch> {
let store = Arc::new(LocalFileSystem::new());

let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
)).unwrap();
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
let url = url::Url::from_file_path(path).unwrap();
let location = Path::from(url.path());
let meta = store.head(&location).await.unwrap();
Expand All @@ -262,14 +295,80 @@ mod tests {
}];

let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
handler
.read_parquet_files(
files,
Arc::new(physical_schema.try_into().unwrap()),
predicate,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();
.unwrap()
}

#[tokio::test]
async fn test_read_parquet_files_with_empty_output() {
let predicate = Some(Expr::lt(Expr::column("value"), Expr::literal(0)));
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
let data = get_record_batches(path, predicate).await;
assert_eq!(data.len(), 0);
}

#[tokio::test]
async fn test_read_parquet_files_filter_data() {
let cases = [
(
5,
Some(Expr::gt_eq(Expr::column("value"), Expr::literal(5))),
),
(1, Some(Expr::gt(Expr::column("value"), Expr::literal(8)))),
(10, None),
];
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
for (num_rows, predicate) in cases {
let data = get_record_batches(path, predicate).await;
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), num_rows);
}
}

#[tokio::test]
async fn test_parquet_protocol_metadata_filter() {
let predicate = Some(Expr::or(
Expr::not(Expr::is_null(Expr::column("metaData.id"))),
Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))),
));

let path =
"./tests/data/app-txn-checkpoint/_delta_log/00000000000000000001.checkpoint.parquet";
let data_filtered = get_record_batches(path, predicate).await;
let data = get_record_batches(path, None).await;

let mut metadata_filtered: Vec<Metadata> = vec![];
let mut metadata_expected: Vec<Metadata> = vec![];
let mut protocol_filtered: Vec<Protocol> = vec![];
let mut protocol_expected: Vec<Protocol> = vec![];

for batch in data_filtered.into_iter().map(Into::<ArrowEngineData>::into) {
if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() {
metadata_filtered.push(metadata);
} else if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() {
protocol_filtered.push(protocol);
} else {
panic!("The filtered data must only have metadata or protocol entries");
}
}
for batch in data.into_iter().map(Into::<ArrowEngineData>::into) {
if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() {
metadata_expected.push(metadata);
}
if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() {
protocol_expected.push(protocol);
}
}

assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 10);
assert_eq!(metadata_filtered, metadata_expected);
assert_eq!(protocol_expected, protocol_expected);
}
}
17 changes: 15 additions & 2 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ use tracing::debug;
use url::Url;

use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_expression::expression_to_row_filter;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

pub(crate) struct SyncParquetHandler;

fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult<ArrowEngineData> {
fn try_create_from_parquet(
schema: SchemaRef,
location: Url,
predicate: Option<Expression>,
) -> DeltaResult<ArrowEngineData> {
let file = File::open(
location
.to_file_path()
Expand All @@ -25,6 +30,13 @@ fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult<Arro
{
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
let parquet_schema = metadata.schema();
let parquet_physical_schema = metadata.parquet_schema();
let row_filter =
expression_to_row_filter(predicate, &schema, parquet_schema, parquet_physical_schema)?;
builder = builder.with_row_filter(row_filter);
}
let mut reader = builder.build()?;
let data = reader
.next()
Expand All @@ -46,7 +58,8 @@ impl ParquetHandler for SyncParquetHandler {
}
let locations: Vec<_> = files.iter().map(|file| file.location.clone()).collect();
Ok(Box::new(locations.into_iter().map(move |location| {
try_create_from_parquet(schema.clone(), location).map(|d| Box::new(d) as _)
try_create_from_parquet(schema.clone(), location, predicate.clone())
.map(|d| Box::new(d) as _)
})))
}
}
Loading
Loading