-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement predicate pushdown for parquet reader #349
base: main
Are you sure you want to change the base?
Implement predicate pushdown for parquet reader #349
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #349 +/- ##
==========================================
+ Coverage 74.03% 74.94% +0.91%
==========================================
Files 43 43
Lines 8137 8542 +405
Branches 8137 8542 +405
==========================================
+ Hits 6024 6402 +378
- Misses 1733 1738 +5
- Partials 380 402 +22 ☔ View full report in Codecov by Sentry. |
3cb3b25
to
3a6dd9b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a quick look
@@ -112,7 +121,7 @@ impl FileOpener for ParquetOpener { | |||
// let projection = self.projection.clone(); | |||
let table_schema = self.table_schema.clone(); | |||
let limit = self.limit; | |||
|
|||
let predicate = self.predicate.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe if the 'to_row_filter' above takes ref we don't need to clone?
@@ -30,6 +32,18 @@ use crate::schema::{DataType, PrimitiveType, SchemaRef}; | |||
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler}; | |||
|
|||
// TODO leverage scalars / Datum | |||
// | |||
pub fn expression_to_row_filter(predicate: Expression) -> RowFilter { | |||
let arrow_predicate = ArrowPredicateFn::new(ProjectionMask::all(), move |batch| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I'm taking all columns. Is there an opportunity to pass in the projection mask too? Could this bring performance gains?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think without a projection mask, this provides no benefit. From the docs:
RowFilter applies predicates in order, after decoding only the columns required. As predicates eliminate rows, fewer rows from subsequent columns may be required, thus potentially reducing IO and decode.
So in the case we expect to use this, it will help because it will only decode the required cols and then can skip without decoding the rest of it, but we'll need to either:
- Specify which columns the predicate applies to
- Work it out from the expression
2 would be better, but if that proves too tricky we could require having it passed in. Possibly some of the code in arrow_utils::get_requested_indices
could help with getting the projection mask, but that's written for schemas and we don't necessarily have those here. But you probably could construct the needed stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so we'd definitely like projection. I was initially only thinking about the row group skipping optimization.
Re option 2: I could imagine a query select * from table where value > 50
. Here I believe the filter would only have the value
column, but we are looking for all columns. If that's the case, I think option 1 is the way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of things:
- For actual "data path" we don't push the predicate down in the "all in one" scan (see here). Although an engine could choose to do that since
read_parquet_files
does take a predicate. Filtering like this is not always a win for reading parquet, so we'll need to be a bit more careful about when we want to actually push things down. Our example code also doesn't push it down yet. Once this is working, we can maybe think about providing some guidance to engines about when to push down. - my option 1 vs 2 above was just about figuring out which columns to project out to evaluate the predicate. You definitely want to project columns out. So for your example, you could either require the caller to tell you that it's expression requires the
value
column, or you could look at the expression itself and notice that that's the only column it references. Figuring it out from the expression is much nicer for users, but requires more work because you need need to examine the expression, see which cols it references. Regardless you then have to figure out what the positions of those columns are in the parquet file, which is very non-trivial (seeget_requested_indices
). You might want to limit this to only allow filtering on root cols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update on this: I'm now filtering columns, but only at the root level since StructType::project
only works at root level. I extract the columns by recursing down the expression structure. I've put up an issue to explore projections in nested columns #353.
8d8d930
to
f68ff08
Compare
fn test_snapshot_read_metadata() { | ||
let path = | ||
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); | ||
fn get_snapshot_from_path(path: &str, version: Option<Version>) -> Snapshot { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zachschuermann I moved setup code into this helper function. How's it look?
Fixed a bug in the protocol and metadata pushdown
9bf5787
to
f2c15e3
Compare
@@ -30,6 +32,18 @@ use crate::schema::{DataType, PrimitiveType, SchemaRef}; | |||
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler}; | |||
|
|||
// TODO leverage scalars / Datum | |||
// | |||
pub fn expression_to_row_filter(predicate: Expression) -> RowFilter { | |||
let arrow_predicate = ArrowPredicateFn::new(ProjectionMask::all(), move |batch| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think without a projection mask, this provides no benefit. From the docs:
RowFilter applies predicates in order, after decoding only the columns required. As predicates eliminate rows, fewer rows from subsequent columns may be required, thus potentially reducing IO and decode.
So in the case we expect to use this, it will help because it will only decode the required cols and then can skip without decoding the rest of it, but we'll need to either:
- Specify which columns the predicate applies to
- Work it out from the expression
2 would be better, but if that proves too tricky we could require having it passed in. Possibly some of the code in arrow_utils::get_requested_indices
could help with getting the projection mask, but that's written for schemas and we don't necessarily have those here. But you probably could construct the needed stuff.
} | ||
|
||
#[test] | ||
fn test_replay_protocol_metadata_filtering_predicate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So currently this just checks that we don't break anything right?
I think we should probably have a more specific check in parquet reader that manually creates the expected expression and pushes it into a filter and ensures it does what it says it will.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I'll go do that 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put up some new tests in default/parquet.rs
. I feel like test_parquet_protocol_metadata_filter
is a little ugly, but I don't see any easy ways to simplify it or make it more reusable.
4987a5a
to
7842372
Compare
parquet_schema, | ||
parquet_physical_schema, | ||
)?; | ||
builder = builder.with_row_filter(row_filter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this will work as well as we wish it would...
In my experience, this kind of row-level pushdown doesn't consistently help performance -- even with the kind of lazy materialization arrow-rust brags about (it won't fetch column chunks until proving at least one row is needed).
The reason is: Every row pays the cost of evaluating the filter, while any I/O reduction is only partial at best. We still have to fetch the columns the predicate touches, so any I/O savings come by not fetching payload columns. But that only works if the filter eliminated ALL rows from the row group. And if row groups can be skipped, then stats-based row group skipping can often do it much more cheaply (no extra I/O at all).
Meanwhile, in cases that don't see any I/O reduction, pushing down the filtering just shifts complexity from the query engine to the file scanner. And that's usually a net loss because the output of the scan is likely consumed in a pipelined single pass either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: There are absolutely cases where row-level filter pushdown is a performance win... but there are too many cases where it doesn't help or even hurts performance instead. And it's data-dependent, so hard to predict how any one query will be affected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks ryan planning on pausing this work for now :)
This PR implements predicate pushdown for the parquet reader used in both the sync and default engines.
Closes: #341