Skip to content

Commit

Permalink
test: Reduce test time spent in CometShuffleSuite (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao authored Feb 20, 2024
1 parent db79910 commit fb88650
Showing 1 changed file with 33 additions and 35 deletions.
68 changes: 33 additions & 35 deletions spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla

protected val numElementsForceSpillThreshold: Int = 10

protected val encryptionEnabled: Boolean = false

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString)
.set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString)
.set("spark.io.encryption.enabled", encryptionEnabled.toString)
}

protected val asyncShuffleEnable: Boolean
Expand Down Expand Up @@ -780,40 +777,41 @@ class DisableAQECometAsyncShuffleSuite extends CometShuffleSuiteBase {
protected val adaptiveExecutionEnabled: Boolean = false
}

/**
* This suite tests the Comet shuffle encryption. Because the encryption configuration can only be
* set in SparkConf at the beginning, we need to create a separate suite for encryption.
*/
class CometShuffleEncryptionSuite extends CometShuffleSuiteBase {
override protected val adaptiveExecutionEnabled: Boolean = true

override protected val asyncShuffleEnable: Boolean = false

override protected val encryptionEnabled: Boolean = true
}

class CometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase {
override protected val adaptiveExecutionEnabled: Boolean = true

override protected val asyncShuffleEnable: Boolean = true

override protected val encryptionEnabled: Boolean = true
}

class DisableAQECometShuffleEncryptionSuite extends CometShuffleSuiteBase {
override protected val adaptiveExecutionEnabled: Boolean = false

override protected val asyncShuffleEnable: Boolean = false

override protected val encryptionEnabled: Boolean = true
}

class DisableAQECometAsyncShuffleEncryptionSuite extends CometShuffleSuiteBase {
override protected val adaptiveExecutionEnabled: Boolean = false
class CometShuffleEncryptionSuite extends CometTestBase {
import testImplicits._

override protected val asyncShuffleEnable: Boolean = true
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.io.encryption.enabled", "true")
}

override protected val encryptionEnabled: Boolean = true
test("comet columnar shuffle with encryption") {
Seq(10, 201).foreach { numPartitions =>
Seq(true, false).foreach { dictionaryEnabled =>
Seq(true, false).foreach { asyncEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000)

(1 until 10).map(i => $"_$i").foreach { col =>
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncEnabled.toString) {
readParquetFile(path.toString) { df =>
val shuffled = df
.select($"_1")
.repartition(numPartitions, col)
checkSparkAnswer(shuffled)
}
}
}
}
}
}
}
}
}

class CometShuffleManagerSuite extends CometTestBase {
Expand Down

0 comments on commit fb88650

Please sign in to comment.