-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support inner iejoin #12754
base: main
Are you sure you want to change the base?
feat: support inner iejoin #12754
Conversation
context: Arc<TaskContext>, | ||
) -> Result<Vec<RecordBatch>> { | ||
let sort_plan = Arc::new(SortExec::new(vec![sort_expr], input)); | ||
let record_batches = collect(sort_plan, context).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce some cost, we need to sort data by condition1 firstly and collect all RecordBatches. But I found the collect
function don't collect all data from SortExec
(it only return one recordbatch, i don't know why), which function should be use here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way don't works. I add SortExec directly in planner(
let sorted_left = Arc::new(SortExec::new( |
Basicly ready for review, more sqllogicaltests will be add later. |
i AS id, | ||
i AS begin, | ||
i + 1 AS end | ||
FROM unnest(generate_series(1, 1000001)) tbl(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This statement works well on my local environment, but failed in ci.
comparison data:
create dataset by datafusion-cli -f iedataset.sql execute:
|
} | ||
|
||
fn required_input_distribution(&self) -> Vec<Distribution> { | ||
vec![Distribution::SinglePartition, Distribution::SinglePartition] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inputs will be sorted by condition1 firstly, so I required SinglePartition here. But not so sure about it.
// use btree map to maintain all p\[i\], for i in 0..j, map\[s\]=t means range \[s, t\) is valid | ||
// our target is to find all pair(i, j) that i<j and p\[i\] < p\[j\] and i from left table and j from right table here | ||
// range_map use key as end index and value as start index to represent a interval [start, end) | ||
let mut range_map = BTreeMap::<u64, u64>::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my test, use btreemap is a little faster than bitmap (https://github.com/my-vegetable-has-exploded/arrow-datafusion/compare/iejoin...my-vegetable-has-exploded:arrow-datafusion:iejoin-bitmap?expand=1), 8.5s -> 7.5s. Currently, the main cost is sorting though.
ptal @xudong963 |
Really impressive work!
My understandings on how IEJoin works: SELECT r.id, s.id
FROM 'employees.parquet' r, 'employees.parquet' s
WHERE r.salary < s.salary AND r.tax > s.tax Conceptually it's executing in 3 steps:
It's still input_size^2 complexity, however the N^2 step (step 3) is just looping through a bit array, it's way more efficient than do N^2 join condition evaluation if it's using NLJ, to make this implementation efficient. This PR only use datafusion's existing sort executer to do step 1, this can be parallelized to run on multiple partitions, and everything else is left inside new |
If use btreemap to maintain it, the complexity will be
I don't know how to use extract the second sorting outside |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
I'm planning on taking a look at this over the next week or so, but it will take a little time for me to get up to speed on the details of what you're trying to do here. Can you add any descriptive text to the PR so I don't need to wade through the entire conversation on the issue? |
I'm really glad that you're willing to review this PR @timsaucer. This PR might appear to be quite lengthy. Additionally, I've been quite busy recently preparing for my graduation thesis, so my responses might not be timely. And this pr wants to support for inner IEJoin, optimizing join operations without equality join conditions but with two or more inequality conditions, and improving the performance of specific queries. The main idea of the IEJoin algorithm is to convert the join operation with inequality conditions into an ordered pair/inversion pair of permutation problem. For example,
Conceptually it's executing in 3 steps: If we use btreemap to maintain all the p[i] where i<j, we can get all pairs in To parallel the above algorithm, we can sort t1 and t2 by time (condition 1) firstly, and repartition the data into N partitions, then join t1[i] and t2[j] respectively. And if the minimum time of t1[i] is greater than the maximum time of t2[j], we can skip the join of t1[i] and t2[j] because there is no join result between them according to condition 1. So I add the optimizer to ensure the input data has been sorted by condition1.
By the way, this perf result shows that the sort process in permutation computing is the main cost currently. |
Which issue does this PR close?
ref #8393
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?