Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inner iejoin #12754

Open
wants to merge 30 commits into
base: main
Choose a base branch
from

Conversation

my-vegetable-has-exploded
Copy link
Contributor

Which issue does this PR close?

ref #8393

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Oct 4, 2024
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Oct 4, 2024
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
let sort_plan = Arc::new(SortExec::new(vec![sort_expr], input));
let record_batches = collect(sort_plan, context).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce some cost, we need to sort data by condition1 firstly and collect all RecordBatches. But I found the collect function don't collect all data from SortExec(it only return one recordbatch, i don't know why), which function should be use here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way don't works. I add SortExec directly in planner(

let sorted_left = Arc::new(SortExec::new(
).

@my-vegetable-has-exploded my-vegetable-has-exploded marked this pull request as ready for review October 5, 2024 17:13
@my-vegetable-has-exploded
Copy link
Contributor Author

Basicly ready for review, more sqllogicaltests will be add later.

i AS id,
i AS begin,
i + 1 AS end
FROM unnest(generate_series(1, 1000001)) tbl(i);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement works well on my local environment, but failed in ci.

@my-vegetable-has-exploded
Copy link
Contributor Author

my-vegetable-has-exploded commented Oct 6, 2024

comparison

data:
iedataset.sql

CREATE
OR REPLACE EXTERNAL TABLE employees(
        id INT, 
        salary INT, 
        tax INT)
STORED AS PARQUET
LOCATION 'employees.parquet';

INSERT INTO employees (id, salary, tax)
SELECT
    facts.id AS id,
    salary,
    (salary / 10 + CASE WHEN random() <= 0.005 THEN round(random() * 100000)::INTEGER ELSE 0 END)::INTEGER AS tax
FROM (
    SELECT
        id,
        100 * id AS salary
    FROM (SELECT UNNEST(range(1, 500000))) tbl(id)
) facts;

COPY employees TO 'employees.parquet' STORED AS PARQUET;

SELECT COUNT(*) FROM 'employees.parquet' r;

create dataset by

datafusion-cli -f iedataset.sql

execute:
iejoin.sql

SELECT COUNT(*) FROM (
	SELECT r.id, s.id
	FROM 'employees.parquet' r, 'employees.parquet' s
	WHERE r.salary < s.salary AND r.tax > s.tax
) q1;

图片

@my-vegetable-has-exploded my-vegetable-has-exploded marked this pull request as draft October 8, 2024 08:27
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition, Distribution::SinglePartition]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inputs will be sorted by condition1 firstly, so I required SinglePartition here. But not so sure about it.

@my-vegetable-has-exploded
Copy link
Contributor Author

my-vegetable-has-exploded commented Oct 13, 2024

perf

It seems the main cost is sorting.

// use btree map to maintain all p\[i\], for i in 0..j, map\[s\]=t means range \[s, t\) is valid
// our target is to find all pair(i, j) that i<j and p\[i\] < p\[j\] and i from left table and j from right table here
// range_map use key as end index and value as start index to represent a interval [start, end)
let mut range_map = BTreeMap::<u64, u64>::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my test, use btreemap is a little faster than bitmap (https://github.com/my-vegetable-has-exploded/arrow-datafusion/compare/iejoin...my-vegetable-has-exploded:arrow-datafusion:iejoin-bitmap?expand=1), 8.5s -> 7.5s. Currently, the main cost is sorting though.

@my-vegetable-has-exploded my-vegetable-has-exploded marked this pull request as ready for review October 16, 2024 13:14
@my-vegetable-has-exploded
Copy link
Contributor Author

ptal @xudong963

@2010YOUY01
Copy link
Contributor

Really impressive work!

  1. I suggest opening another PR for benchmarks only, it can get merged easily and also help attract more attention.

  2. I have a question: (just skimmed through the duckdb blog, haven't fully understood the algorithm, please correct me if I'm wrong)

My understandings on how IEJoin works:
For query in above benchmark's example

SELECT r.id, s.id
	FROM 'employees.parquet' r, 'employees.parquet' s
	WHERE r.salary < s.salary AND r.tax > s.tax

Conceptually it's executing in 3 steps:

  1. Sort (r union s) on salary, add one column for salary_rank
  2. Sort (r union s) again on tax, the salary_rank in step 1 gets permutated into Permutation Array
  3. Construct a bit array (same length as (r union s)), and do nest loop on it using Permutation Array information, to find matches

It's still input_size^2 complexity, however the N^2 step (step 3) is just looping through a bit array, it's way more efficient than do N^2 join condition evaluation if it's using NLJ, to make this implementation efficient.

This PR only use datafusion's existing sort executer to do step 1, this can be parallelized to run on multiple partitions, and everything else is left inside new IEJoin executor, including union all two input tables, step 2 sorting, and looping over bit vector.
Is it possible to extract the second sorting outside IEJoin executor, and let the existing sort executor to do this? This step then can be more easily parallelized, or does IEJoin executor have a better way to parallelize its job?

@my-vegetable-has-exploded
Copy link
Contributor Author

my-vegetable-has-exploded commented Oct 22, 2024

It's still input_size^2 complexity, however the N^2 step (step 3) is just looping through a bit array, it's way more efficient than do N^2 join condition evaluation if it's using NLJ, to make this implementation efficient.

If use btreemap to maintain it, the complexity will be $NlogN + OutputSize$. And the bitmap[i..n] maybe sparse in many scenes, use bitmap with bloom filter(bloomfilter[i]=0 means bitmap[ki..k(i+1)] are all 0) can skip lots of useless value, and bitmap is very cache-friendly.

Is it possible to extract the second sorting outside IEJoin executor, and let the existing sort executor to do this? This step then can be more easily parallelized, or does IEJoin executor have a better way to parallelize its job?

I don't know how to use extract the second sorting outside IEJoin executor though. And currently parallelize mechanism is split the left table into n blocks and right table into m blocks (left table and right table already sort by condition1). Then we have n*m part data to compute, and we can check whether l[i] block and r[j] block can produce any result pair satisfying condition1 in O(1) complexity, if not result pair can be produced, we can just skip the block pair l[i] and r[j]. If you have a better idea, I'd love to hear about it.

Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Dec 22, 2024
@timsaucer
Copy link
Contributor

I'm planning on taking a look at this over the next week or so, but it will take a little time for me to get up to speed on the details of what you're trying to do here. Can you add any descriptive text to the PR so I don't need to wade through the entire conversation on the issue?

@timsaucer timsaucer removed the Stale PR has not had any activity for some time label Dec 23, 2024
@my-vegetable-has-exploded
Copy link
Contributor Author

my-vegetable-has-exploded commented Dec 29, 2024

I'm planning on taking a look at this over the next week or so, but it will take a little time for me to get up to speed on the details of what you're trying to do here. Can you add any descriptive text to the PR so I don't need to wade through the entire conversation on the issue?

I'm really glad that you're willing to review this PR @timsaucer. This PR might appear to be quite lengthy. Additionally, I've been quite busy recently preparing for my graduation thesis, so my responses might not be timely.

And this pr wants to support for inner IEJoin, optimizing join operations without equality join conditions but with two or more inequality conditions, and improving the performance of specific queries.

The main idea of the IEJoin algorithm is to convert the join operation with inequality conditions into an ordered pair/inversion pair of permutation problem.

For example,

SELECT t1.t id, t2.t id
FROM west t1, west t2
WHERE t1.time < t2.time AND t1.cost < t2.cost

Conceptually it's executing in 3 steps:
1、Sort (r union s) on time in ascend order, add one column for time_rank(1..n)
2、Sort (r union s with time_rank) again on cost in ascend order, the time_rank in step 1 gets permutated into Permutation Array(represented as p)
3、Compute the ordered pair of permutation array p. For a pair (i, j) in l2, if i < j then e_i.cost < e_j.cost because l2 is sorted by cost in ascending order. And if p[i] < p[j], then e_i.time < e_j.time because l1 is sorted by time in ascending order.

If we use btreemap to maintain all the p[i] where i<j, we can get all pairs in $NlogN+OutputSize$. And you can find more detailed examples in the comments.

To parallel the above algorithm, we can sort t1 and t2 by time (condition 1) firstly, and repartition the data into N partitions, then join t1[i] and t2[j] respectively. And if the minimum time of t1[i] is greater than the maximum time of t2[j], we can skip the join of t1[i] and t2[j] because there is no join result between them according to condition 1. So I add the optimizer to ensure the input data has been sorted by condition1.

perf

It seems the main cost is sorting.

By the way, this perf result shows that the sort process in permutation computing is the main cost currently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants