Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add native shuffle and columnar shuffle #30

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object CometConf {
"The amount of additional memory to be allocated per executor process for Comet, in MiB. " +
"This config is optional. If this is not specified, it will be set to " +
"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"This is memory that accounts for things like Comet native execution, etc.")
"This is memory that accounts for things like Comet native execution, Comet shuffle, etc.")
.bytesConf(ByteUnit.MiB)
.createOptional

Expand Down Expand Up @@ -119,6 +119,110 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
.doc("Whether to enable Comet native shuffle. By default, this config is false. " +
"Note that this requires setting 'spark.shuffle.manager' to" +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"'org.apache.spark.sql.comet.execution.CometShuffleManager'. 'spark.shuffle.manager' must " +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"be set before starting the Spark application and cannot be changed during the application")
viirya marked this conversation as resolved.
Show resolved Hide resolved
.booleanConf
.createWithDefault(false)

val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.enabled")
.doc(
"Force Comet to only use Arrow-based shuffle for CometScan and Spark regular operators. " +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"If this is enabled, Comet native shuffle will not be enabled but only Arrow shuffle. " +
"By default, this config is false.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
.stringConf
.createWithDefault("zstd")

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.async.enabled")
.doc(
"Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config " +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"is false.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
.doc("Number of threads used for Comet async columnar shuffle per shuffle task. " +
"By default, this config is 3. Note that more threads means more memory requirement to " +
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
"improve performance, and should be set based on the number of cores available.")
.intConf
.createWithDefault(3)

val COMET_EXEC_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " +
"By default, this config is 100. This is the upper bound of total number of shuffle " +
"threads per executor. In other words, if the number of cores * the number of shuffle " +
"threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than " +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"than this config. Comet will use this config as the number of shuffle threads per " +
"executor instead.")
.intConf
.createWithDefault(100)
}

val COMET_EXEC_SHUFFLE_SPILL_THRESHOLD: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.spill.threshold")
.doc(
"Number of rows to be spilled used for Comet columnar shuffle. " +
viirya marked this conversation as resolved.
Show resolved Hide resolved
"For every configured number of rows, a new spill file will be created. " +
"Higher value means more memory requirement to buffer shuffle data before " +
"flushing to disk. As Comet uses Arrow-based shuffle which is columnar format, " +
"higher value usually helps to improve shuffle data compression ratio. This is " +
"internal config for testing purpose or advanced tuning. By default, " +
"this config is Int.Max.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)

val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.doc(
"The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
"Note that this config is only used when `spark.comet.columnar.shuffle.enabled` is " +
"true. Once allocated memory size reaches this config, the current batch will be " +
"flushed to disk immediately. If this is not configured, Comet will use " +
"`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " +
"shuffle memory size. If final calculated value is larger than Comet memory " +
"overhead, Comet will use Comet memory overhead as shuffle memory size.")
.bytesConf(ByteUnit.MiB)
.createOptional

val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.doc(
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"By default, this config is 1.0.")
.doubleConf
.checkValue(
factor => factor > 0,
"Ensure that Comet shuffle memory overhead factor is a double greater than 0")
.createWithDefault(1.0)

val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
.doc("The ratio of total values to distinct values in a string column to decide whether to " +
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
"this config, dictionary encoding will be used on shuffling string column. This config " +
"is effective if it is higher than 1.0. By default, this config is 10.0. Note that this " +
"config is only used when 'spark.comet.columnar.shuffle.enabled' is true.")
.doubleConf
.createWithDefault(10.0)

val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.execution.shuffle

import java.nio.channels.ReadableByteChannel

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometConf
import org.apache.comet.vector.{NativeUtil, StreamReader}

class ArrowReaderIterator(channel: ReadableByteChannel) extends Iterator[ColumnarBatch] {

private val nativeUtil = new NativeUtil

private val maxBatchSize = CometConf.COMET_BATCH_SIZE.get(SQLConf.get)

private val reader = StreamReader(channel)
private var currentIdx = -1
private var batch = nextBatch()
private var previousBatch: ColumnarBatch = null
private var currentBatch: ColumnarBatch = null

override def hasNext: Boolean = {
if (batch.isDefined) {
return true
}

batch = nextBatch()
if (batch.isEmpty) {
return false
}
true
}

override def next(): ColumnarBatch = {
if (!hasNext) {
throw new NoSuchElementException
}

val nextBatch = batch.get
val batchRows = nextBatch.numRows()
val numRows = Math.min(batchRows - currentIdx, maxBatchSize)

// Release the previous sliced batch.
// If it is not released, when closing the reader, arrow library will complain about
// memory leak.
if (currentBatch != null) {
// Close plain arrays in the previous sliced batch.
// The dictionary arrays will be closed when closing the entire batch.
currentBatch.close()
}

currentBatch = nativeUtil.takeRows(nextBatch, currentIdx, numRows)
currentIdx += numRows

if (currentIdx == batchRows) {
// We cannot close the batch here, because if there is dictionary array in the batch,
// the dictionary array will be closed immediately, and the returned sliced batch will
// be invalid.
previousBatch = batch.get

batch = None
currentIdx = -1
}

currentBatch
}

private def nextBatch(): Option[ColumnarBatch] = {
if (previousBatch != null) {
previousBatch.close()
previousBatch = null
}
currentIdx = 0
reader.nextBatch()
}

def close(): Unit =
synchronized {
if (currentBatch != null) {
currentBatch.close()
}
reader.close()
}
}
Loading