Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CometExecRule should handle ShuffleQueryStage and ReusedExchange #186

Merged
merged 5 commits into from
Mar 13, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 10, 2024

Which issue does this PR close?

Closes #185.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya force-pushed the fix_aqe_plan branch 2 times, most recently from 8026ea6 to 30ded81 Compare March 10, 2024 21:19
@codecov-commenter
Copy link

codecov-commenter commented Mar 10, 2024

Codecov Report

Attention: Patch coverage is 54.16667% with 11 lines in your changes are missing coverage. Please review.

Project coverage is 33.32%. Comparing base (d069713) to head (5a7f7fe).

Files Patch % Lines
...org/apache/comet/CometSparkSessionExtensions.scala 54.54% 5 Missing and 5 partials ⚠️
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #186      +/-   ##
============================================
+ Coverage     33.31%   33.32%   +0.01%     
- Complexity      767      769       +2     
============================================
  Files           107      107              
  Lines         35375    35395      +20     
  Branches       7658     7669      +11     
============================================
+ Hits          11784    11795      +11     
- Misses        21144    21146       +2     
- Partials       2447     2454       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

allTypes = allTypes.filterNot(Set(14, 17).contains)
if (!isSpark34Plus) {
// TODO: Remove this once after https://github.com/apache/arrow/issues/40038 is fixed
allTypes = allTypes.filterNot(Set(14).contains)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously these columns fail on Spark 3.4. I just ran test with them and looks okay now.

Comment on lines +67 to 70
if (!isSpark34Plus) {
// TODO: Remove this once after https://github.com/apache/arrow/issues/40038 is fixed
allTypes = allTypes.filterNot(Set(14).contains)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, after this change, Comet native operator can be after CometExchange. So it triggers the known bug in Java Arrow on column _14. I exclude the column for Spark 3.2 and 3.3.

== Physical Plan ==                                                                                                                                                                                                                    
AdaptiveSparkPlan isFinalPlan=true                     
+- == Final Plan ==                                                                                                
   *(1) ColumnarToRow                                                                                              
   +- CometProject [_1#556], [_1#556]                                                                              
      +- ShuffleQueryStage 0                                                                                                                                                                                                           
         +- CometExchange hashpartitioning(_13#568, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=838]                                                                                                                         
            +- CometScan parquet [_1#556,_13#568] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/liangchi/repos/arrow-datafusion-comet/spark/target/tmp/spa..., PartitionFilter
s: [], PushedFilters: [], ReadSchema: struct<_1:boolean,_13:string>                                       
+- == Initial Plan ==                                                                                              
   CometProject [_1#556], [_1#556]                                                                                 
   +- CometExchange hashpartitioning(_13#568, 10), REPARTITION_BY_NUM, CometNativeShuffle, [plan_id=830]
      +- CometScan parquet [_1#556,_13#568] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/liangchi/repos/arrow-datafusion-comet/spark/target/tmp/spa..., PartitionFilters: [],
 PushedFilters: [], ReadSchema: struct<_1:boolean,_13:string>

case _ =>
None
def transform1(op: SparkPlan): Option[Operator] = {
val allNativeExec = op.children.map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this change is needed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not UnaryExecNode.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

// For AQE shuffle stage on a reused Comet shuffle exchange
// Note that we don't need to handle `ReusedExchangeExec` for non-AQE case, because
// the query plan won't be re-optimized/planned in non-AQE mode.
case s @ ShuffleQueryStageExec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious if we can replace this rule with:

        case ReusedExchangeExec(_, op) => op

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, then users won't see ReusedExchangeExec anymore in explain string or Spark UI. It is useful to know which part is reused by Spark.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops you are right

@viirya viirya merged commit 81a641f into apache:main Mar 13, 2024
26 checks passed
@viirya
Copy link
Member Author

viirya commented Mar 13, 2024

Merged. Thanks.

@viirya viirya deleted the fix_aqe_plan branch March 13, 2024 18:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ShuffleQueryStage and ReusedExchange are not processed by CometExecRule
5 participants