Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inner iejoin #12754

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0e478de
init iejoinexec.
my-vegetable-has-exploded Sep 21, 2024
9b552cc
init executionplan.
my-vegetable-has-exploded Sep 22, 2024
12da70e
wip
my-vegetable-has-exploded Sep 28, 2024
eca6cf8
basic implement iejoinstream.
my-vegetable-has-exploded Sep 30, 2024
d7d3dfd
..
my-vegetable-has-exploded Oct 1, 2024
b3b0e69
basic init.
my-vegetable-has-exploded Oct 1, 2024
2dd0635
impl planner.
my-vegetable-has-exploded Oct 2, 2024
24e516f
fix column index.
my-vegetable-has-exploded Oct 3, 2024
a8b509b
add ut.
my-vegetable-has-exploded Oct 4, 2024
0c3a893
fix swap operator.
my-vegetable-has-exploded Oct 4, 2024
ffbf265
add sqllogicaltest.
my-vegetable-has-exploded Oct 4, 2024
f04021d
fix cargo.lock.
my-vegetable-has-exploded Oct 4, 2024
acd8474
rm useless dependcy.
my-vegetable-has-exploded Oct 4, 2024
007c00b
fix sort partition.
my-vegetable-has-exploded Oct 5, 2024
ca296d3
fix test string.
my-vegetable-has-exploded Oct 5, 2024
b6633a7
fix tests & clippy
my-vegetable-has-exploded Oct 5, 2024
8110ecd
fix test contain.
my-vegetable-has-exploded Oct 5, 2024
4d48810
fix sort removed.
my-vegetable-has-exploded Oct 5, 2024
44d5f76
add more tests.
my-vegetable-has-exploded Oct 6, 2024
246811a
test generate_series.
my-vegetable-has-exploded Oct 8, 2024
4c3bd6c
test generate_series.
my-vegetable-has-exploded Oct 8, 2024
8c819a9
add more comments.
my-vegetable-has-exploded Oct 8, 2024
1738495
add metric
my-vegetable-has-exploded Oct 13, 2024
a67a720
fix permutation len.
my-vegetable-has-exploded Oct 13, 2024
9fcd867
fix metric
my-vegetable-has-exploded Oct 13, 2024
698fb5c
fix comment.
my-vegetable-has-exploded Oct 13, 2024
b246e7e
little update.
my-vegetable-has-exploded Oct 13, 2024
cde1f8f
use left_order.
my-vegetable-has-exploded Oct 16, 2024
dea673a
fix tests.
my-vegetable-has-exploded Oct 16, 2024
7d03765
fix clippy.
my-vegetable-has-exploded Oct 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

126 changes: 118 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,15 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, SortExpr,
DescribeTable, DmlStatement, Extension, Filter, JoinType, RecursiveQuery, SortExpr,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::utils::{conjunction, split_conjunction};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::joins::utils::JoinFilter;
use datafusion_physical_plan::joins::IEJoinExec;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;

Expand Down Expand Up @@ -1063,14 +1066,24 @@ impl DefaultPhysicalPlanner {
session_state.config_options().optimizer.prefer_hash_join;

let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
// there is no equal join condition, use the nested loop join
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
Arc::new(NestedLoopJoinExec::try_new(
physical_left,
physical_right,
join_filter,
// there is no equal join condition, try to use iejoin or use the nested loop join
if let Some(iejoin) = try_iejoin(
Arc::clone(&physical_left),
Arc::clone(&physical_right),
&join_filter,
join_type,
)?)
session_state.config().target_partitions(),
)? {
iejoin
} else {
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
Arc::new(NestedLoopJoinExec::try_new(
physical_left,
physical_right,
join_filter,
join_type,
)?)
}
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& !prefer_hash_join
Expand Down Expand Up @@ -1659,6 +1672,103 @@ pub fn create_physical_sort_expr(
})
}

/// Try to create an IEJoin execution plan for join without equality conditions
pub fn try_iejoin(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
filter: &Option<JoinFilter>,
join_type: &JoinType,
target_partitions: usize,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if join_type != &JoinType::Inner {
// TODO: support other join types, only Inner join is supported currently
return Ok(None);
}
if let Some(filter) = filter {
// split filter into multiple conditions
let mut conditions = split_conjunction(filter.expression());
// take first two inequality conditions, swap the binary expression if necessary
let inequality_conditions = conditions
.iter()
.enumerate()
.map(|(index, condition)| {
(
index,
JoinFilter::new(
Arc::clone(condition),
filter.column_indices().to_vec(),
filter.schema().clone(),
),
JoinFilter::new(
join_utils::swap_binary_expr(&Arc::clone(condition)),
filter.column_indices().to_vec(),
filter.schema().clone(),
),
)
})
.map(|(index, condition, condition_swap)| {
(
index,
condition.clone(),
join_utils::check_inequality_condition(&condition).is_ok(),
condition_swap.clone(),
join_utils::check_inequality_condition(&condition_swap).is_ok(),
)
})
.map(
|(
index,
condition,
condition_valid,
condition_swap,
condition_swap_valid,
)| {
if condition_valid {
(index, condition, true)
} else if condition_swap_valid {
(index, condition_swap, true)
} else {
(index, condition, false)
}
},
)
.filter(|(_, _, condition_valid)| *condition_valid)
.take(2)
.collect::<Vec<_>>();
// if inequality_conditions has less than 2 elements, return None
if inequality_conditions.len() < 2 {
return Ok(None);
}
// remove the taken inequality conditions from conditions
// remove from back to front to keep the index correct
for (index, _condition, _condition_valid) in inequality_conditions.iter().rev() {
conditions.remove(*index);
}
// create a new filter with the remaining conditions
let new_filter = conjunction(conditions);
let inequality_conditions = inequality_conditions
.iter()
.map(|(_, condition, _)| condition.clone())
.collect::<Vec<_>>();
Ok(Some(Arc::new(IEJoinExec::try_new(
left,
right,
inequality_conditions,
new_filter.map(|expr| {
join_utils::JoinFilter::new(
expr,
filter.column_indices().to_vec(),
filter.schema().clone(),
)
}),
join_type,
target_partitions,
)?)))
} else {
Ok(None)
}
}

/// Create vector of physical sort expression from a vector of logical expression
pub fn create_physical_sort_exprs(
exprs: &[SortExpr],
Expand Down
54 changes: 54 additions & 0 deletions datafusion/physical-expr/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ use itertools::Itertools;
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;

pub fn conjunction(exprs: Vec<&Arc<dyn PhysicalExpr>>) -> Option<Arc<dyn PhysicalExpr>> {
exprs
.iter()
.map(|expr| Arc::clone(expr))
.reduce(|acc, expr| Arc::new(BinaryExpr::new(acc, Operator::And, expr)))
}

/// Assume the predicate is in the form of CNF, split the predicate to a Vec of PhysicalExprs.
///
/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"]
Expand Down Expand Up @@ -214,6 +221,31 @@ pub fn collect_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<Column> {
columns
}

/// map physical columns according to given index mapping
pub fn map_columns(
expr: Arc<dyn PhysicalExpr>,
mapping: &HashMap<usize, usize>,
) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
let new_index = mapping.get(&column.index()).cloned();
if let Some(new_index) = new_index {
return Ok(Transformed::yes(Arc::new(Column::new(
column.name(),
new_index,
))));
} else {
return datafusion_common::internal_err!(
"column index {} not found in mapping",
column.index()
);
}
}
Ok(Transformed::no(expr))
})
.data()
}

/// Re-assign column indices referenced in predicate according to given schema.
/// This may be helpful when dealing with projections.
pub fn reassign_predicate_columns(
Expand Down Expand Up @@ -547,4 +579,26 @@ pub(crate) mod tests {
assert_eq!(collect_columns(&expr3), expected);
Ok(())
}

#[test]
fn test_map_columns() -> Result<()> {
let col1 = Arc::new(Column::new("col1", 0));
let col2 = Arc::new(Column::new("col2", 1));
let col3 = Arc::new(Column::new("col3", 2));
let expr = Arc::new(BinaryExpr::new(col1, Operator::Plus, col2)) as _;
let mapping = HashMap::from([(0, 2), (1, 0)]);
let mapped = map_columns(expr, &mapping)?;
assert_eq!(
mapped.as_ref(),
Arc::new(BinaryExpr::new(
Arc::new(Column::new("col1", 2)),
Operator::Plus,
Arc::new(Column::new("col2", 0))
))
.as_any()
);
// test mapping with non-existing index
assert!(map_columns(col3, &mapping).is_err());
Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
Expand Down
Loading
Loading