Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkOutOfMemoryError happens when running CometColumnarExchange #886

Closed
Kontinuation opened this issue Aug 29, 2024 · 10 comments · Fixed by #1063
Closed

SparkOutOfMemoryError happens when running CometColumnarExchange #886

Kontinuation opened this issue Aug 29, 2024 · 10 comments · Fixed by #1063
Assignees
Labels
bug Something isn't working
Milestone

Comments

@Kontinuation
Copy link
Member

Kontinuation commented Aug 29, 2024

Describe the bug

We easily run into this problem when running queries with spark.comet.exec.shuffle.mode=jvm:

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108848 bytes of memory, got 65700208 bytes. Available: 65700208

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108848 bytes of memory, got 65700208 bytes. Available: 65700208
	at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocate(CometShuffleMemoryAllocator.java:132)
	at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocatePage(CometShuffleMemoryAllocator.java:119)
	at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.initialCurrentPage(SpillWriter.java:158)
	at org.apache.spark.shuffle.sort.CometShuffleExternalSorter.insertRecord(CometShuffleExternalSorter.java:368)
	at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.insertRecordIntoSorter(CometUnsafeShuffleWriter.java:278)
	at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.write(CometUnsafeShuffleWriter.java:206)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

We've observed this problem not only on our own workloads but also on TPC-H benchmarks. The above-mentioned exception happens when running TPC-H query 5 on parquet files with scale factor = 1000.

We've tried to disable the comet shuffle manager and use Spark's own shuffle exchange, all TPC-H queries could finish successfully.

Steps to reproduce

Running TPC-H query 5 on a Spark cluster. The detailed environment and spark configurations are listed in Additional context.

Expected behavior

All TPC-H queries should finish successfully.

Additional context

The problem was produced on a self-deployed K8S Spark cluster on AWS.

  • Driver/executor instance type: r7i.2xlarge (8 vCPUs, 64GB memory)
  • Executor pod resource limit: 6 vCPUs, 48GB memory. We reserved some resources for some reason
  • Number of executor instances: 48
  • Spark version: 3.4.0
  • Java version: 17
  • Comet version: commit 9205f0d

Here are relevant spark configurations:

spark.executor.cores 6
spark.executor.memory 30719m
# Reserve native memory for comet, python and other stuff
spark.executor.memoryOverheadFactor 0.6
# Each executor core gets 1.2 GB memory for comet, all 6 executors will use 7.2GB memory.
# I know this is too small for comet, but it should not prevent the query from finishing
spark.comet.memory.overhead.factor 0.04

spark.sql.extensions org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
spark.comet.exec.shuffle.enabled true
spark.comet.exec.shuffle.mode jvm
spark.shuffle.manager org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
@Kontinuation Kontinuation added the bug Something isn't working label Aug 29, 2024
@Kontinuation
Copy link
Member Author

Kontinuation commented Aug 29, 2024

CometShuffleMemoryAllocator allocates at most spark.comet.shuffle.memory.factor * spark.comet.memoryOverhead bytes of memory for all comet external sorters. Usually, the number of concurrently running external sorters is the number of executor cores. In this case, 1.2 * 0.7 GB shuffle memory is shared by all 6 cores.

While for other native comet operators, a dedicated GreedyMemoryPool sized spark.comet.memoryOverhead is created for each operator (assuming we are not using the unified memory manager introduced by #83), the comet shuffle memory amortized to each core is too small compared to other operators, unless we configure spark.comet.columnar.shuffle.memorySize additionally.

CometShuffleMemoryAllocator is a singleton, so all comet external sorters allocate from a shared memory pool. The comet external sorters can only spill themselves when allocation fails, and the CometShuffleMemoryAllocator does not support making other memory consumers spill to free up memory for the requesting memory consumer. If a comet external sorter is using just a tiny amount of memory and fails an allocation, it won't be able to do anything other than throw a SparkOutOfMemoryError exception. Is it feasible to support creating dedicated CometShuffleMemoryAllocator for each shuffle writer, since it is a safer choice when operators can only self-spill?

@andygrove
Copy link
Member

@Kontinuation @viirya I am trying to reproduce this issue now, but I am not sure if it is already resolved by #988?

@viirya
Copy link
Member

viirya commented Oct 30, 2024

I think so.

@Kontinuation
Copy link
Member Author

I think it is not. This is related to the Java implementation of comet columnar shuffle (spark.comet.exec.shuffle.mode=jvm ), the native shuffle writer is not used in this case, so the fix for the native shuffle writer may not resolve this issue.

@viirya
Copy link
Member

viirya commented Oct 31, 2024

Oh, this is a separate issue.

@andygrove andygrove self-assigned this Oct 31, 2024
@andygrove
Copy link
Member

I have not been able to reproduce this issue yet. I am using the same Comet commit and so far have tested on a single node cluster with these configs:

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=2 \
    --conf spark.executor.memory=30719m \
    --conf spark.executor.memoryOverheadFactor=0.6 \
    --conf spark.comet.memory.overhead.factor=0.04 \
    --conf spark.executor.cores=6 \
    --conf spark.cores.max=12 \
    --conf spark.eventLog.enabled=true \
    --jars $COMET_JAR \
    --driver-class-path $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=auto \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    tpcbench.py \
    --name comet \
    --benchmark tpch \
    --data /mnt/bigdata/tpch/sf1000/ \
    --queries /home/andy/git/apache/datafusion-benchmarks/tpch/queries \
    --output . \
    --iterations 1

The query completes:

Query 5 returned 5 rows
Query 5 took 456.6006717681885 seconds

I am going to test on a two node k8s cluster next.

@andygrove
Copy link
Member

I do wonder if the issue is related to specifying spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions (which was originally the documented way to enable Comet) rather than using spark.plugins=org.apache.spark.CometPlugin which would set the executor memory correctly.

@andygrove
Copy link
Member

I could not reproduce this issue in k8s either. Here is the spark-submit command that I used.

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=4 \
    --conf spark.executor.memory=30719m \
    --conf spark.executor.memoryOverheadFactor=0.6 \
    --conf spark.comet.memory.overhead.factor=0.04 \
    --conf spark.executor.cores=6 \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --deploy-mode cluster \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.driver.container.image=$COMET_IMAGE \
    --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.benchmark-results-pv.options.claimName=benchmark-results-pvc \
    --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.benchmark-results-pv.mount.path=/mnt/benchmark-results \
    --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.benchmark-results-pv.mount.readOnly=false \
    --conf spark.kubernetes.driver.volumes.hostPath.data.mount.path=/mnt/bigdata \
    --conf spark.kubernetes.driver.volumes.hostPath.data.options.path=/mnt/bigdata \
    --conf spark.kubernetes.driver.volumes.hostPath.data.options.readOnly=false \
    --conf spark.kubernetes.executor.container.image=$COMET_IMAGE \
    --conf spark.kubernetes.executor.volumes.hostPath.data.mount.path=/mnt/bigdata \
    --conf spark.kubernetes.executor.volumes.hostPath.data.options.path=/mnt/bigdata \
    --conf spark.eventLog.enabled=false \
    local:///tpcbench.py \
    --name comet \
    --benchmark tpch \
    --data /mnt/bigdata/tpch/sf1000/ \
    --queries /tpch \
    --output /mnt/benchmark-results \
    --iterations 1

@Kontinuation Do you have any suggestions for how I can reproduce this issue?

@andygrove
Copy link
Member

I now see that I missed spark.comet.exec.shuffle.mode=jvm. Retesting...

@andygrove
Copy link
Member

I can reproduce this now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment