-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: CometExecRule should handle ShuffleQueryStage and ReusedExchange #186
Conversation
8026ea6
to
30ded81
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #186 +/- ##
============================================
+ Coverage 33.31% 33.32% +0.01%
- Complexity 767 769 +2
============================================
Files 107 107
Lines 35375 35395 +20
Branches 7658 7669 +11
============================================
+ Hits 11784 11795 +11
- Misses 21144 21146 +2
- Partials 2447 2454 +7 ☔ View full report in Codecov by Sentry. |
allTypes = allTypes.filterNot(Set(14, 17).contains) | ||
if (!isSpark34Plus) { | ||
// TODO: Remove this once after https://github.com/apache/arrow/issues/40038 is fixed | ||
allTypes = allTypes.filterNot(Set(14).contains) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously these columns fail on Spark 3.4. I just ran test with them and looks okay now.
if (!isSpark34Plus) { | ||
// TODO: Remove this once after https://github.com/apache/arrow/issues/40038 is fixed | ||
allTypes = allTypes.filterNot(Set(14).contains) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides, after this change, Comet native operator can be after CometExchange
. So it triggers the known bug in Java Arrow on column _14
. I exclude the column for Spark 3.2 and 3.3.
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(1) ColumnarToRow
+- CometProject [_1#556], [_1#556]
+- ShuffleQueryStage 0
+- CometExchange hashpartitioning(_13#568, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=838]
+- CometScan parquet [_1#556,_13#568] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/liangchi/repos/arrow-datafusion-comet/spark/target/tmp/spa..., PartitionFilter
s: [], PushedFilters: [], ReadSchema: struct<_1:boolean,_13:string>
+- == Initial Plan ==
CometProject [_1#556], [_1#556]
+- CometExchange hashpartitioning(_13#568, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=830]
+- CometScan parquet [_1#556,_13#568] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/liangchi/repos/arrow-datafusion-comet/spark/target/tmp/spa..., PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<_1:boolean,_13:string>
spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
Outdated
Show resolved
Hide resolved
case _ => | ||
None | ||
def transform1(op: SparkPlan): Option[Operator] = { | ||
val allNativeExec = op.children.map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why this change is needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not UnaryExecNode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
// For AQE shuffle stage on a reused Comet shuffle exchange | ||
// Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because | ||
// the query plan won't be re-optimized/planned in non-AQE mode. | ||
case s @ ShuffleQueryStageExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious if we can replace this rule with:
case ReusedExchangeExec(_, op) => op
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, then users won't see ReusedExchangeExec
anymore in explain string or Spark UI. It is useful to know which part is reused by Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops you are right
Merged. Thanks. |
Which issue does this PR close?
Closes #185.
Rationale for this change
What changes are included in this PR?
How are these changes tested?