diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md new file mode 100644 index 000000000..502b35c29 --- /dev/null +++ b/docs/source/contributor-guide/benchmarking.md @@ -0,0 +1,62 @@ + + +# Comet Benchmarking Guide + +To track progress on performance, we regularly run benchmarks derived from TPC-H and TPC-DS. Benchmarking scripts are +available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository. + +Here is an example command for running the benchmarks. This command will need to be adapted based on the Spark +environment and location of data files. + +This command assumes that `datafusion-benchmarks` is checked out in a parallel directory to `datafusion-comet`. + +```shell +$SPARK_HOME/bin/spark-submit \ + --master "local[*]" \ + --conf spark.driver.memory=8G \ + --conf spark.executor.memory=64G \ + --conf spark.executor.cores=16 \ + --conf spark.cores.max=16 \ + --conf spark.eventLog.enabled=true \ + --conf spark.sql.autoBroadcastJoinThreshold=-1 \ + --jars $COMET_JAR \ + --conf spark.driver.extraClassPath=$COMET_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.comet.enabled=true \ + --conf spark.comet.exec.enabled=true \ + --conf spark.comet.exec.all.enabled=true \ + --conf spark.comet.cast.allowIncompatible=true \ + --conf spark.comet.explainFallback.enabled=true \ + --conf spark.comet.parquet.io.enabled=false \ + --conf spark.comet.batchSize=8192 \ + --conf spark.comet.columnar.shuffle.enabled=false \ + --conf spark.comet.exec.shuffle.enabled=true \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.sql.adaptive.coalescePartitions.enabled=false \ + --conf spark.comet.shuffle.enforceMode.enabled=true \ + ../datafusion-benchmarks/runners/datafusion-comet/tpcbench.py \ + --benchmark tpch \ + --data /mnt/bigdata/tpch/sf100-parquet/ \ + --queries ../datafusion-benchmarks/tpch/queries +``` + +Comet performance can be compared to regular Spark performance by running the benchmark twice, once with +`spark.comet.enabled` set to `true` and once with it set to `false`. \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index 9066ce756..819f72014 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -47,6 +47,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Supported Data Types Configuration Settings Compatibility Guide + Tuning Guide .. _toc.contributor-guide-links: .. toctree:: @@ -57,6 +58,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Comet Plugin Overview Development Guide Debugging Guide + Benchmarking Guide Profiling Native Code Github and Issue Tracker diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md new file mode 100644 index 000000000..01fa7bdbe --- /dev/null +++ b/docs/source/user-guide/tuning.md @@ -0,0 +1,60 @@ + + +# Tuning Guide + +Comet provides some tuning options to help you get the best performance from your queries. + + +## Shuffle + +Comet provides Comet shuffle features that can be used to improve the performance of your queries. +The following sections describe the different shuffle options available in Comet. + +To enable Comet shuffle, set the following configuration in your Spark configuration: + +``` +spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager +spark.comet.exec.shuffle.enabled=true +``` + +`spark.shuffle.manager` is a Spark static configuration which cannot be changed at runtime. +It must be set before the Spark context is created. You can enable or disable Comet shuffle +at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`. +Once it is disabled, Comet will fallback to the default Spark shuffle manager. + +### Columnar Shuffle + +By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses columnar shuffle +to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning, +RoundRobinPartitioning, RangePartitioning and SinglePartitioning. + +Columnar shuffle can be disabled by setting `spark.comet.columnar.shuffle.enabled` to `false`. + +### Native Shuffle + +Comet also provides a fully native shuffle implementation that can be used to improve the performance. +To enable native shuffle, just disable `spark.comet.columnar.shuffle.enabled`. + +Native shuffle only supports HashPartitioning and SinglePartitioning. + + + + + diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index f9059d0ba..85a19f55c 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -575,11 +575,13 @@ class CometSparkSessionExtensions // exchange. It is only used for Comet native execution. We only transform Spark broadcast // exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the // broadcast exchange is forced to be enabled by Comet config. + // Note that `CometBroadcastExchangeExec` is only supported for Spark 3.4+. case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) => val newChildren = plan.children.map { case b: BroadcastExchangeExec if isCometNative(b.child) && - isCometOperatorEnabled(conf, "broadcastExchangeExec") => + isCometOperatorEnabled(conf, "broadcastExchangeExec") && + isSpark34Plus => // Spark 3.4+ only QueryPlanSerde.operator2Proto(b) match { case Some(nativeOp) => val cometOp = CometBroadcastExchangeExec(b, b.child) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala index 06c5898f7..7bd34debb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala @@ -238,13 +238,13 @@ case class CometBroadcastExchangeExec(originalPlan: SparkPlan, child: SparkPlan) obj match { case other: CometBroadcastExchangeExec => this.originalPlan == other.originalPlan && - this.output == other.output && this.child == other.child + this.child == other.child case _ => false } } - override def hashCode(): Int = Objects.hashCode(output, child) + override def hashCode(): Int = Objects.hashCode(child) override def stringArgs: Iterator[Any] = Iterator(output, child) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index ad07ff0e2..63587af32 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -406,14 +406,14 @@ case class CometProjectExec( obj match { case other: CometProjectExec => this.projectList == other.projectList && - this.output == other.output && this.child == other.child && + this.child == other.child && this.serializedPlanOpt == other.serializedPlanOpt case _ => false } } - override def hashCode(): Int = Objects.hashCode(projectList, output, child) + override def hashCode(): Int = Objects.hashCode(projectList, child) override protected def outputExpressions: Seq[NamedExpression] = projectList } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 8f022988f..2e1444281 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -38,8 +38,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, date_add, expr, lead, sum} @@ -62,6 +63,40 @@ class CometExecSuite extends CometTestBase { } } + test("ReusedExchangeExec should work on CometBroadcastExchangeExec") { + assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") + withSQLConf( + CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.USE_V1_SOURCE_LIST.key -> "") { + withTempPath { path => + spark + .range(5) + .withColumn("p", $"id" % 2) + .write + .mode("overwrite") + .partitionBy("p") + .parquet(path.toString) + withTempView("t") { + spark.read.parquet(path.toString).createOrReplaceTempView("t") + val df = sql(""" + |SELECT t1.id, t2.id, t3.id + |FROM t AS t1 + |JOIN t AS t2 ON t2.id = t1.id + |JOIN t AS t3 ON t3.id = t2.id + |WHERE t1.p = 1 AND t2.p = 1 AND t3.p = 1 + |""".stripMargin) + val reusedPlan = ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan) + val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec => + r + } + assert(reusedExchanges.size == 1) + assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec]) + } + } + } + } + test("CometShuffleExchangeExec logical link should be correct") { withTempView("v") { spark.sparkContext