Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Add some documentation explaining how shuffle works #1148

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions docs/source/contributor-guide/plugin_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,36 @@ In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which c
Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to
override the DataFusion versions to ensure compatibility with Apache Spark.

The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
`CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan,
a Spark exchange, or another native plan.

`CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD
partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes
executing, the resulting Arrow batches are imported into the JVM using Arrow FFI.

## Shuffle

Spark uses a shuffle mechanism to transfer data between query stages. The shuffle mechanism is part of Spark Core
rather than Spark SQL and operates on RDDs.

### Shuffle Writes

For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains a `ShuffleDependency` that is
broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which
requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer.

`ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to the shuffle writer.

As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for
writing the batches to the shuffle file.

### Shuffle Reads

For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle manager. Comet provides a
`CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks from Spark and then creates an
`ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for decoding IPC batches.

## Arrow

Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code.
Expand All @@ -103,7 +125,7 @@ Comet uses a combination of Arrow FFI and Arrow IPC to achieve this.

### Arrow FFI

The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for
The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for
accessing Arrow data structures from multiple languages.

[Arrow C Data Interface]: https://arrow.apache.org/docs/format/CDataInterface.html
Expand Down