Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 18, 2024
2 parents 7709193 + ec8da30 commit 2dcc472
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 7 deletions.
62 changes: 62 additions & 0 deletions docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Comet Benchmarking Guide

To track progress on performance, we regularly run benchmarks derived from TPC-H and TPC-DS. Benchmarking scripts are
available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository.

Here is an example command for running the benchmarks. This command will need to be adapted based on the Spark
environment and location of data files.

This command assumes that `datafusion-benchmarks` is checked out in a parallel directory to `datafusion-comet`.

```shell
$SPARK_HOME/bin/spark-submit \
--master "local[*]" \
--conf spark.driver.memory=8G \
--conf spark.executor.memory=64G \
--conf spark.executor.cores=16 \
--conf spark.cores.max=16 \
--conf spark.eventLog.enabled=true \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--jars $COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.comet.cast.allowIncompatible=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.comet.parquet.io.enabled=false \
--conf spark.comet.batchSize=8192 \
--conf spark.comet.columnar.shuffle.enabled=false \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.adaptive.coalescePartitions.enabled=false \
--conf spark.comet.shuffle.enforceMode.enabled=true \
../datafusion-benchmarks/runners/datafusion-comet/tpcbench.py \
--benchmark tpch \
--data /mnt/bigdata/tpch/sf100-parquet/ \
--queries ../datafusion-benchmarks/tpch/queries
```

Comet performance can be compared to regular Spark performance by running the benchmark twice, once with
`spark.comet.enabled` set to `true` and once with it set to `false`.
2 changes: 2 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer
Supported Data Types <user-guide/datatypes>
Configuration Settings <user-guide/configs>
Compatibility Guide <user-guide/compatibility>
Tuning Guide <user-guide/tuning>

.. _toc.contributor-guide-links:
.. toctree::
Expand All @@ -57,6 +58,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer
Comet Plugin Overview <contributor-guide/plugin_overview>
Development Guide <contributor-guide/development>
Debugging Guide <contributor-guide/debugging>
Benchmarking Guide <contributor-guide/benchmarking>
Profiling Native Code <contributor-guide/profiling_native_code>
Github and Issue Tracker <https://github.com/apache/datafusion-comet>

Expand Down
60 changes: 60 additions & 0 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Tuning Guide

Comet provides some tuning options to help you get the best performance from your queries.


## Shuffle

Comet provides Comet shuffle features that can be used to improve the performance of your queries.
The following sections describe the different shuffle options available in Comet.

To enable Comet shuffle, set the following configuration in your Spark configuration:

```
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.enabled=true
```

`spark.shuffle.manager` is a Spark static configuration which cannot be changed at runtime.
It must be set before the Spark context is created. You can enable or disable Comet shuffle
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fallback to the default Spark shuffle manager.

### Columnar Shuffle

By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses columnar shuffle
to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning,
RoundRobinPartitioning, RangePartitioning and SinglePartitioning.

Columnar shuffle can be disabled by setting `spark.comet.columnar.shuffle.enabled` to `false`.

### Native Shuffle

Comet also provides a fully native shuffle implementation that can be used to improve the performance.
To enable native shuffle, just disable `spark.comet.columnar.shuffle.enabled`.

Native shuffle only supports HashPartitioning and SinglePartitioning.





Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -575,11 +575,13 @@ class CometSparkSessionExtensions
// exchange. It is only used for Comet native execution. We only transform Spark broadcast
// exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the
// broadcast exchange is forced to be enabled by Comet config.
// Note that `CometBroadcastExchangeExec` is only supported for Spark 3.4+.
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
isCometOperatorEnabled(conf, "broadcastExchangeExec") =>
isCometOperatorEnabled(conf, "broadcastExchangeExec") &&
isSpark34Plus => // Spark 3.4+ only
QueryPlanSerde.operator2Proto(b) match {
case Some(nativeOp) =>
val cometOp = CometBroadcastExchangeExec(b, b.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ case class CometBroadcastExchangeExec(originalPlan: SparkPlan, child: SparkPlan)
obj match {
case other: CometBroadcastExchangeExec =>
this.originalPlan == other.originalPlan &&
this.output == other.output && this.child == other.child
this.child == other.child
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(output, child)
override def hashCode(): Int = Objects.hashCode(child)

override def stringArgs: Iterator[Any] = Iterator(output, child)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,14 @@ case class CometProjectExec(
obj match {
case other: CometProjectExec =>
this.projectList == other.projectList &&
this.output == other.output && this.child == other.child &&
this.child == other.child &&
this.serializedPlanOpt == other.serializedPlanOpt
case _ =>
false
}
}

override def hashCode(): Int = Objects.hashCode(projectList, output, child)
override def hashCode(): Int = Objects.hashCode(projectList, child)

override protected def outputExpressions: Seq[NamedExpression] = projectList
}
Expand Down
37 changes: 36 additions & 1 deletion spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, date_add, expr, lead, sum}
Expand All @@ -62,6 +63,40 @@ class CometExecSuite extends CometTestBase {
}
}

test("ReusedExchangeExec should work on CometBroadcastExchangeExec") {
assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+")
withSQLConf(
CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPath { path =>
spark
.range(5)
.withColumn("p", $"id" % 2)
.write
.mode("overwrite")
.partitionBy("p")
.parquet(path.toString)
withTempView("t") {
spark.read.parquet(path.toString).createOrReplaceTempView("t")
val df = sql("""
|SELECT t1.id, t2.id, t3.id
|FROM t AS t1
|JOIN t AS t2 ON t2.id = t1.id
|JOIN t AS t3 ON t3.id = t2.id
|WHERE t1.p = 1 AND t2.p = 1 AND t3.p = 1
|""".stripMargin)
val reusedPlan = ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan)
val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec =>
r
}
assert(reusedExchanges.size == 1)
assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec])
}
}
}
}

test("CometShuffleExchangeExec logical link should be correct") {
withTempView("v") {
spark.sparkContext
Expand Down

0 comments on commit 2dcc472

Please sign in to comment.