Skip to content

Commit

Permalink
feat: Support limit operations (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
jurplel authored Feb 12, 2024
1 parent cfa595b commit feffd5b
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 74 deletions.
138 changes: 69 additions & 69 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 42 additions & 2 deletions optd-datafusion-bridge/src/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use optd_datafusion_repr::{
plan_nodes::{
BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, ConstantType, Expr, FuncExpr, FuncType,
JoinType, LogOpExpr, LogOpType, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PhysicalAgg,
PhysicalEmptyRelation, PhysicalFilter, PhysicalHashJoin, PhysicalNestedLoopJoin,
PhysicalProjection, PhysicalScan, PhysicalSort, PlanNode, SortOrderExpr, SortOrderType,
PhysicalEmptyRelation, PhysicalFilter, PhysicalHashJoin, PhysicalLimit,
PhysicalNestedLoopJoin, PhysicalProjection, PhysicalScan, PhysicalSort, PlanNode,
SortOrderExpr, SortOrderType,
},
properties::schema::Schema as OptdSchema,
PhysicalCollector,
Expand Down Expand Up @@ -271,6 +272,41 @@ impl OptdPlanContext<'_> {
)
}

#[async_recursion]
async fn conv_from_optd_limit(
&mut self,
node: PhysicalLimit,
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
let child = self.conv_from_optd_plan_node(node.child()).await?;

// Limit skip/fetch expressions are only allowed to be constant int
assert!(node.skip().typ() == OptRelNodeTyp::Constant(ConstantType::UInt64));
// Conversion from u64 -> usize could fail (also the case in into_optd)
let skip = ConstantExpr::from_rel_node(node.skip().into_rel_node())
.unwrap()
.value()
.as_u64()
.try_into()
.unwrap();

assert!(node.fetch().typ() == OptRelNodeTyp::Constant(ConstantType::UInt64));
let fetch = ConstantExpr::from_rel_node(node.fetch().into_rel_node())
.unwrap()
.value()
.as_u64();
let fetch_opt: Option<usize> = if fetch == u64::MAX {
None
} else {
Some(fetch.try_into().unwrap())
};

Ok(
Arc::new(datafusion::physical_plan::limit::GlobalLimitExec::new(
child, skip, fetch_opt,
)) as Arc<dyn ExecutionPlan>,
)
}

#[async_recursion]
async fn conv_from_optd_sort(
&mut self,
Expand Down Expand Up @@ -493,6 +529,10 @@ impl OptdPlanContext<'_> {
Arc::new(datafusion_schema),
)) as Arc<dyn ExecutionPlan>)
}
OptRelNodeTyp::PhysicalLimit => {
self.conv_from_optd_limit(PhysicalLimit::from_rel_node(rel_node).unwrap())
.await
}
typ => unimplemented!("{}", typ),
};
result.with_context(|| format!("when processing {}", rel_node_dbg))
Expand Down
Loading

0 comments on commit feffd5b

Please sign in to comment.