Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing ColumnarToRow when using CometSparkToColumnar #1092

Open
bmorck opened this issue Nov 15, 2024 · 7 comments
Open

Missing ColumnarToRow when using CometSparkToColumnar #1092

bmorck opened this issue Nov 15, 2024 · 7 comments
Labels
bug Something isn't working

Comments

@bmorck
Copy link

bmorck commented Nov 15, 2024

Describe the bug

I'm working on some internal benchmarks using Comet 0.3.0 with Spark 3.3 and Iceberg. To support iceberg, we are including the config, which inserts the CometSparkToColumnar nodes following the BatchScans

"spark.comet.sparkToColumnar.enabled" = "true"
"spark.comet.sparkToColumnar.supportedOperatorList" = "BatchScan"

In the following example, we see that there is a missing ColumnarToRow preceding the Project (19) operator. This results in the query failing. After analyzing the query optimization, I've found that the EliminateRedundantTransitions rule, removed a CometSparkToColumnar and subsequent ColumnarToRow following the BatchScan (20) operator, due to the subsequent Filter (21) operator requiring row format.

> CometSort (27)
>    +- CometColumnarExchange (26)
>       +- BroadcastNestedLoopJoin Cross BuildRight (25)
>          :- Project (19)
>          :  +- CometSortMergeJoin (18)
>          :     :- CometSort (6)
>          :     :  +- ShuffleQueryStage (5)
>          :     :     +- CometExchange (4)
>          :     :        +- CometFilter (3)
>          :     :           +- CometSparkToColumnar (2)
>          :     :              +- BatchScan (1)
>          :     +- CometSort (17)
>          :        +- CometExchange (16)
>          :           +- CometFilter (15)
>          :              +- CometHashAggregate (14)
>          :                 +- ShuffleQueryStage (13)
>          :                    +- CometExchange (12)
>          :                       +- CometHashAggregate (11)
>          :                          +- CometProject (10)
>          :                             +- CometFilter (9)
>          :                                +- CometSparkToColumnar (8)
>          :                                   +- BatchScan (7)
>          +- BroadcastQueryStage (24)
>             +- BroadcastExchange (23)
>                +- * Project (22)
>                   +- * Filter (21)
>                      +- BatchScan (20)
> 

I've modified the filter to be able to be converted to native and we see the query inserts the appropriate ColumnarToRow transitions, as shown below. It's unclear if this is a bug in Sparks ApplyColumnarRulesAndInsertTransitions rule or if this is unique to Comet, but it seems like incorrect behavior when using the CometSparkToColumnarNode

>    ColumnarToRow (34)
>    +- CometSort (33)
>       +- AQEShuffleRead (32)
>          +- ShuffleQueryStage (31), Statistics(sizeInBytes=4.2 KiB, rowCount=36)
>             +- CometColumnarExchange (30)
>                +- BroadcastNestedLoopJoin Cross BuildRight (29)
>                   :- Project (21)
>                   :  +- ColumnarToRow (20)
>                   :     +- CometBroadcastHashJoin (19)
>                   :        :- BroadcastQueryStage (8), Statistics(sizeInBytes=319.7 KiB, rowCount=583)
>                   :        :  +- CometBroadcastExchange (7)
>                   :        :     +- AQEShuffleRead (6)
>                   :        :        +- ShuffleQueryStage (5), Statistics(sizeInBytes=409.3 KiB, rowCount=583)
>                   :        :           +- CometExchange (4)
>                   :        :              +- CometFilter (3)
>                   :        :                 +- CometSparkToColumnar (2)
>                   :        :                    +- BatchScan (1)
>                   :        +- CometFilter (18)
>                   :           +- CometHashAggregate (17)
>                   :              +- AQEShuffleRead (16)
>                   :                 +- ShuffleQueryStage (15), Statistics(sizeInBytes=2040.0 B, rowCount=10)
>                   :                    +- CometExchange (14)
>                   :                       +- CometHashAggregate (13)
>                   :                          +- CometProject (12)
>                   :                             +- CometFilter (11)
>                   :                                +- CometSparkToColumnar (10)
>                   :                                   +- BatchScan (9)
>                   +- BroadcastQueryStage (28), Statistics(sizeInBytes=864.0 B, rowCount=36)
>                      +- BroadcastExchange (27)
>                         +- ColumnarToRow (26)
>                            +- CometProject (25)
>                               +- CometFilter (24)
>                                  +- CometSparkToColumnar (23)
>                                     +- BatchScan (22)

Steps to reproduce

No response

Expected behavior

All appropriate ColumnarToRow operators are inserted and query succeeds

Additional context

No response

@bmorck bmorck added the bug Something isn't working label Nov 15, 2024
@andygrove
Copy link
Member

I'm in the process of creating the first 0.4.0 release candidate and am uploading the jar files to a maven staging repository. It may be worth testing with this newer version. I'll post more details once the jars are available.

@andygrove
Copy link
Member

@bmorck You can download 0.4.0-rc1 jar files from https://repository.apache.org/#nexus-search;quick~org.apache.datafusion

@huaxingao
Copy link
Contributor

@bmorck Thanks for reporting the bug. When you ran the benchmark, did you use the changes from the upstream Iceberg PR?

@viirya
Copy link
Member

viirya commented Nov 15, 2024

I slightly updated the description to make the query plan readable.

@viirya
Copy link
Member

viirya commented Nov 15, 2024

After analyzing the query optimization, I've found that the EliminateRedundantTransitions rule, removed a CometSparkToColumnar and subsequent ColumnarToRow following the BatchScan (20) operator, due to the subsequent Filter (21) operator requiring row format.

Removing CometSparkToColumnar + ColumnarToRow from the top of BatchScan (20) looks correct if the Filter (21) cannot be converted to CometFilter.

In the following example, we see that there is a missing ColumnarToRow preceding the Project (19) operator. This results in the query failing.

I'm not sure how is it related to the issue on Project (19)?

@bmorck
Copy link
Author

bmorck commented Nov 15, 2024

@viirya It's not clear to me that the issue is related to the issue on Project (19) however, I noticed that the appropriate ColumnarToRow operators are injected when the filter is able to be converted to native and hence the EliminateRedundantTransitions rule doesn't remove any nodes. I've seen this occur other queries as well. Here is another example:

> +- AdaptiveSparkPlan (46)
>    +- == Current Plan ==
>       Sort (26)
>       +- Project (25)
>          +- Filter (24)
>             +- Window (23)
>                +- Sort (22)
>                   +- Exchange (21)
>                      +- Union (20)
>                         :- Project (10)
>                         :  +- Filter (9)
>                         :     +- Window (8)
>                         :        +- CometSort (7)
>                         :           +- ShuffleQueryStage (6)
>                         :              +- CometExchange (5)
>                         :                 +- CometProject (4)
>                         :                    +- CometFilter (3)
>                         :                       +- CometSparkToColumnar (2)
>                         :                          +- BatchScan (1)
>                         +- Project (19)
>                            +- Filter (18)
>                               +- Window (17)
>                                  +- Sort (16)
>                                     +- ShuffleQueryStage (15)
>                                        +- Exchange (14)
>                                           +- Project (13)
>                                              +- Filter (12)
>                                                 +- BatchScan (11)

@huaxingao I didn't port over the PR yet, that's my next step and perhaps that resolves the issue. Will report here if I find that to be the case

@andygrove Thanks for providing the new jar! Will try it out. We also are using an internal fork of comet (current changes are only enabling Spark 3.3 on some of the version restricted operators) so I will also rebase to include latest changes

@viirya
Copy link
Member

viirya commented Nov 15, 2024

The only place in Comet planner to remove ColumnarToRowExec is when there is a combination ColumnarToRowExec + CometSparkToColumnarExec. As the combination is a no-op actually, we can remove it from the query plan.

And it is removed from the top of BatchScan (20), it is not even close to Project (19), so I'm wondering why it is related. 🤔

Do you have an example query we can reproduce this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants