Skip to content

Commit

Permalink
fix: Improve error "BroadcastExchange is not supported" (apache#577)
Browse files Browse the repository at this point in the history
* fix: Improve error "BroadcastExchange is not supported"

* Update queries list

* Add more explanations. Fix plan stability regression.

* Make extended explain more user friendly (provide a tree view of plan with info)

* fix ci

* address review comment

* spotless
  • Loading branch information
parthchandra authored Jul 2, 2024
1 parent 0d2fcbc commit 68efa57
Show file tree
Hide file tree
Showing 8 changed files with 14,183 additions and 902 deletions.
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
.doc(
"When this setting is enabled, Comet will provide a verbose tree representation of " +
"the extended information.")
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
.doc(
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
Expand Down
14,165 changes: 13,374 additions & 791 deletions spark/inspections/CometTPCDSQueriesList-results.txt

Large diffs are not rendered by default.

712 changes: 628 additions & 84 deletions spark/inspections/CometTPCHQueriesList-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, isSpark40Plus, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.CometExplainInfo.getActualPlan
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, isSpark40Plus, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -222,6 +223,19 @@ class CometSparkSessionExtensions

private def isCometNative(op: SparkPlan): Boolean = op.isInstanceOf[CometNativeExec]

private def explainChildNotNative(op: SparkPlan): String = {
var nonNatives: Seq[String] = Seq()
val actualOp = getActualPlan(op)
actualOp.children.foreach {
case p: SparkPlan =>
if (!isCometNative(p)) {
nonNatives = nonNatives :+ getActualPlan(p).nodeName
}
case _ =>
}
nonNatives.mkString("(", ", ", ")")
}

// spotless:off
/**
* Tries to transform a Spark physical plan into a Comet plan.
Expand Down Expand Up @@ -292,6 +306,10 @@ class CometSparkSessionExtensions
op,
op.children.map(_.asInstanceOf[CometNativeExec].nativeOp): _*)
} else {
withInfo(
op,
s"${op.nodeName} is not native because the following children are not native " +
s"${explainChildNotNative(op)}")
None
}
}
Expand Down Expand Up @@ -445,7 +463,10 @@ class CometSparkSessionExtensions
op

case op: ShuffledHashJoinExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "ShuffleHashJoin disabled because not all child plans are native")
withInfo(
op,
"ShuffleHashJoin disabled because the following children are not native " +
s"${explainChildNotNative(op)}")
op

case op: BroadcastHashJoinExec
Expand All @@ -469,10 +490,6 @@ class CometSparkSessionExtensions
op
}

case op: BroadcastHashJoinExec if !isCometOperatorEnabled(conf, "broadcast_hash_join") =>
withInfo(op, "BroadcastHashJoin is not enabled")
op

case op: SortMergeJoinExec
if isCometOperatorEnabled(conf, "sort_merge_join") &&
op.children.forall(isCometNative(_)) =>
Expand All @@ -492,12 +509,25 @@ class CometSparkSessionExtensions
case None =>
op
}

case op: SortMergeJoinExec
if isCometOperatorEnabled(conf, "sort_merge_join") &&
!op.children.forall(isCometNative(_)) =>
withInfo(
op,
"SortMergeJoin is not enabled because the following children are not native " +
s"${explainChildNotNative(op)}")
op

case op: SortMergeJoinExec if !isCometOperatorEnabled(conf, "sort_merge_join") =>
withInfo(op, "SortMergeJoin is not enabled")
op

case op: SortMergeJoinExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "SortMergeJoin disabled because not all child plans are native")
withInfo(
op,
"SortMergeJoin is not enabled because the following children are not native " +
s"${explainChildNotNative(op)}")
op

case c @ CoalesceExec(numPartitions, child)
Expand All @@ -516,7 +546,10 @@ class CometSparkSessionExtensions
c

case op: CoalesceExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "Coalesce disabled because not all child plans are native")
withInfo(
op,
"Coalesce is not enabled because the following children are not native " +
s"${explainChildNotNative(op)}")
op

case s: TakeOrderedAndProjectExec
Expand Down Expand Up @@ -569,7 +602,10 @@ class CometSparkSessionExtensions
u

case op: UnionExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "Union disabled because not all child plans are native")
withInfo(
op,
"Union is not enabled because the following children are not native " +
s"${explainChildNotNative(op)}")
op

// For AQE broadcast stage on a Comet broadcast exchange
Expand Down Expand Up @@ -606,22 +642,33 @@ class CometSparkSessionExtensions
if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
newPlan
} else {
if (!isCometOperatorEnabled(
conf,
"broadcastExchangeExec") && !isCometBroadCastForceEnabled(conf)) {
withInfo(plan, "Native Broadcast is not enabled")
if (isCometNative(newPlan)) {
val reason =
getCometBroadcastNotEnabledReason(conf).getOrElse("no reason available")
withInfo(plan, s"Broadcast is not enabled: $reason")
}
plan
}
} else {
withInfo(
plan,
s"${plan.nodeName} is not native because the following children are not native " +
s"${explainChildNotNative(plan)}")
plan
}

// this case should be checked only after the previous case checking for a
// child BroadcastExchange has been applied, otherwise that transform
// never gets applied
case op: BroadcastHashJoinExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "BroadcastHashJoin disabled because not all child plans are native")
withInfo(
op,
"BroadcastHashJoin is not enabled because the following children are not native " +
s"${explainChildNotNative(op)}")
op

case op: BroadcastHashJoinExec if !isCometOperatorEnabled(conf, "broadcast_hash_join") =>
withInfo(op, "BroadcastHashJoin is not enabled")
op

// For AQE shuffle stage on a Comet shuffle exchange
Expand Down Expand Up @@ -714,6 +761,9 @@ class CometSparkSessionExtensions
case op =>
// An operator that is not supported by Comet
op match {
// Broadcast exchange exec is transformed by the parent node. We include
// this case specially here so we do not add a misleading 'info' message
case _: BroadcastExchangeExec => op
case _: CometExec | _: CometBroadcastExchangeExec | _: CometShuffleExchangeExec => op
case o =>
withInfo(o, s"${o.nodeName} is not supported")
Expand Down Expand Up @@ -945,6 +995,20 @@ object CometSparkSessionExtensions extends Logging {
COMET_EXEC_BROADCAST_FORCE_ENABLED.get(conf)
}

private[comet] def getCometBroadcastNotEnabledReason(conf: SQLConf): Option[String] = {
val operator = "broadcastExchangeExec"
if (!isCometOperatorEnabled(conf, "broadcastExchangeExec") && !isCometBroadCastForceEnabled(
conf)) {
Some(
s"$COMET_EXEC_CONFIG_PREFIX.$operator.enabled is not specified and " +
s"${COMET_EXEC_BROADCAST_FORCE_ENABLED.key} is not specified")
} else if (!isSpark34Plus) {
Some("Native broadcast requires Spark 3.4 or newer")
} else {
None
}
}

private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
Expand Down
99 changes: 88 additions & 11 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,18 @@ import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
import org.apache.spark.sql.execution.{InputAdapter, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}

import org.apache.comet.CometExplainInfo.getActualPlan

class ExtendedExplainInfo extends ExtendedExplainGenerator {

override def title: String = "Comet"

override def generateExtendedInfo(plan: SparkPlan): String = {
val info = extensionInfo(plan)
info.toSeq.sorted.mkString("\n").trim
}

private def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
node match {
case p: AdaptiveSparkPlanExec => getActualPlan(p.executedPlan)
case p: InputAdapter => getActualPlan(p.child)
case p: QueryStageExec => getActualPlan(p.plan)
case p: WholeStageCodegenExec => getActualPlan(p.child)
case p => p
if (CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.get()) {
generateVerboseExtendedInfo(plan)
} else {
val info = extensionInfo(plan)
info.toSeq.sorted.mkString("\n").trim
}
}

Expand Down Expand Up @@ -81,8 +77,89 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
}
ordered.reverse
}

// generates the extended info in a verbose manner, printing each node along with the
// extended information in a tree display
def generateVerboseExtendedInfo(plan: SparkPlan): String = {
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString)
outString.toString()
}

// Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any
def generateTreeString(
node: TreeNode[_],
depth: Int,
lastChildren: Seq[Boolean],
indent: Int,
outString: StringBuilder): Unit = {
outString.append(" " * indent)
if (depth > 0) {
lastChildren.init.foreach { isLast =>
outString.append(if (isLast) " " else ": ")
}
outString.append(if (lastChildren.last) "+- " else ":- ")
}

val tagValue = node.getTagValue(CometExplainInfo.EXTENSION_INFO)
val str = if (tagValue.nonEmpty) {
s" ${node.nodeName} [COMET: ${tagValue.get.mkString(", ")}]"
} else {
node.nodeName
}
outString.append(str)
outString.append("\n")

val innerChildrenLocal = node.innerChildren
if (innerChildrenLocal.nonEmpty) {
innerChildrenLocal.init.foreach {
case c @ (_: TreeNode[_]) =>
generateTreeString(
getActualPlan(c),
depth + 2,
lastChildren :+ node.children.isEmpty :+ false,
indent,
outString)
case _ =>
}
generateTreeString(
getActualPlan(innerChildrenLocal.last),
depth + 2,
lastChildren :+ node.children.isEmpty :+ true,
indent,
outString)
}
if (node.children.nonEmpty) {
node.children.init.foreach {
case c @ (_: TreeNode[_]) =>
generateTreeString(
getActualPlan(c),
depth + 1,
lastChildren :+ false,
indent,
outString)
case _ =>
}
node.children.last match {
case c @ (_: TreeNode[_]) =>
generateTreeString(getActualPlan(c), depth + 1, lastChildren :+ true, indent, outString)
case _ =>
}
}
}
}

object CometExplainInfo {
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")

def getActualPlan(node: TreeNode[_]): TreeNode[_] = {
node match {
case p: AdaptiveSparkPlanExec => getActualPlan(p.executedPlan)
case p: InputAdapter => getActualPlan(p.child)
case p: QueryStageExec => getActualPlan(p.plan)
case p: WholeStageCodegenExec => getActualPlan(p.child)
case p => p
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(
s"SELECT sum(c0), sum(c2) from $table group by c1",
Set(
"HashAggregate is not native because the following children are not native (AQEShuffleRead)",
"Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled",
"AQEShuffleRead is not supported")),
(
Expand All @@ -1554,8 +1555,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
"Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled",
"AQEShuffleRead is not supported",
"make_interval is not supported",
"BroadcastExchange is not supported",
"BroadcastHashJoin disabled because not all child plans are native")))
"HashAggregate is not native because the following children are not native (AQEShuffleRead)",
"Project is not native because the following children are not native (BroadcastHashJoin)",
"BroadcastHashJoin is not enabled because the following children are not native" +
" (BroadcastExchange, Project)")))
.foreach(test => {
val qry = test._1
val expected = test._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ trait CometTPCQueryListBase
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true",
// Lower bloom filter thresholds to allows us to simulate the plan produced at larger scale.
"spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold" -> "1MB",
"spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold" -> "1MB") {
Expand Down

0 comments on commit 68efa57

Please sign in to comment.