Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce CometTaskMemoryManager and native side memory pool #83

Merged
merged 6 commits into from
Mar 6, 2024

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Feb 22, 2024

Which issue does this PR close?

Closes #34.

Rationale for this change

Currently Comet uses the default memory pool implementation in DataFusion, which is not aware of the memory manager on the JVM Spark side. Therefore, in the case when a Spark job has both Spark and Comet operators, we'd need to initialize two memory pools separately for each of them, and make sure there is enough budget in them. In addition, since we cannot trigger spilling from native to JVM, or vise versa, the budget need to be large enough which means Comet typically will need to use more memory than Spark does.

Since Spark already has a UnifiedMemoryManager, this PR proposes to create a new memory pool implementation which delegate calls to the JVM side UnifiedMemoryManager, which serves as the source of truth and serves memory acquisition and release from both JVM and native side.

What changes are included in this PR?

This PR introduces a CometMemoryPool class on the native side, overriding the default memory pool used by DF. This memory pool dispatches calls to Spark's TaskMemoryManager for acquiring and releasing memory.

The newly added memory pool will only be activated when spark.memory.offHeap.enabled is set to true. Otherwise, the behavior remains the same as before (and spark.executor.memoryOverhead need to be large enough for native execution). In TestBosonBase, spark.memory.offHeap.enabled is enabled so all the tests within Comet are tested with the new feature.

How are these changes tested?

All the existing tests are updated to use the new memory manager implementation.

@sunchao sunchao force-pushed the memory-manager-comet branch 2 times, most recently from 76002b9 to 26f4802 Compare February 22, 2024 21:58
@sunchao sunchao marked this pull request as ready for review February 23, 2024 22:15
@sunchao
Copy link
Member Author

sunchao commented Feb 23, 2024

cc @viirya

@viirya
Copy link
Member

viirya commented Feb 23, 2024

Thanks @sunchao. I will review this in next days (or week).

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is in great shape. Left some minor comments.

@@ -103,6 +103,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
iterators: jobjectArray,
serialized_query: jbyteArray,
metrics_node: JObject,
task_memory_manager_obj: JObject,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about rename this to comet_task_memory_manager_obj?

When I first read the code, I thought it was the Spark's TaskMemoryManager object. However it's comet's CometTaskMemoryManager. It would be clear to call it comet_task_memory_manager_obj

Other occurrence could be renamed too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

.get("use_unified_memory_manager")
.ok_or(CometError::Internal(
"Config 'use_unified_memory_manager' is not specified from Comet JVM side".to_string(),
))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe a more permissive way is to treat unsetting use_unified_memory_manager as false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sure, although this is an internal error from developer side if not set.

* share the same API as Spark and cannot trigger spill when acquire memory. Therefore, when
* acquiring memory from native or JVM, spilling can only be triggered from JVM operators.
*/
private class NativeMemoryConsumer extends MemoryConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer's toString might be used when the debugging log is turned on.

It would be great that we can override this class to provide toString method and also add a unique flag/id to identify the corresponding consumer for the native plan/execution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add a toString for now and we can figure out how to use it for debugging purpose later.

cometBatchIterators,
protobufQueryPlan,
nativeMetrics,
new CometTaskMemoryManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm referring this. I think we can pass id to CometTaskMemoryManager and use that for identity mark.

conf.set("spark.shuffle.manager", shuffleManager)
conf.set(SQLConf.ANSI_ENABLED.key, "false")
conf.set(SHUFFLE_MANAGER, shuffleManager)
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still going to test the default memory pool implementation in DataFusion?

Seems like all the test code path are the unified memory manager now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long term I'm thinking to only use the memory pool defined in Comet. This currently requires users to turn on off-heap mode in Spark and set the off-heap memory accordingly, so configuration changes are necessary when they want to use Comet. Ideally we should be able to use DriverPlugin to override the memory settings so Comet may just work out of box (need to change Spark in a few places).

The default memory manager path is kept only for now until we are able to do the override through DriverPlugin. Internally we still run all the Spark SQL tests using the default memory manager, and can probably do the same here too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should be able to use DriverPlugin to override the memory settings so Comet may just work out of box (need to change Spark in a few places).

Oh. It seems that DriverPlugin is initialized before task scheduler. Which places in Spark do we need for CometDriverPlugin to override memory settings? The memory overhead is already override by comet.

and can probably do the same here too.

Is this still in this PR's scope?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. It seems that DriverPlugin is initialized before task scheduler. Which places in Spark do we need for CometDriverPlugin to override memory settings? The memory overhead is already override by comet.

I already made one change in Spark: apache/spark#45052 for this. We'll need a few more changes so we can completely overwrite executor memory setting through DriverPlugin.

Is this still in this PR's scope?

Not really. Will do that in #8

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already made one change in Spark: apache/spark#45052 for this. We'll need a few more changes so we can completely overwrite executor memory setting through DriverPlugin.

Great work. Looking forward that we can completely overwrite memory settings through DriverPlugin.

@sunchao sunchao force-pushed the memory-manager-comet branch from d58ab8d to 0cfb2f7 Compare February 28, 2024 18:53
Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM from my side, except the conflict should be resolved.

Copy link
Contributor

@snmvaughan snmvaughan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good catch on check_exception in jni_call.

Comment on lines 61 to 87
fn grow(&self, _: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Relaxed);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to update (acquire) the required memory from JVM memory manager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think grow is not really used by DataFusion except in tests, that's why I didn't do it. But you are right, it's better to add it too for future proof.

Comment on lines 85 to 92
if acquired < additional as i64 {
return Err(DataFusionError::Execution(format!(
"Failed to acquire {} bytes, only got {}. Reserved: {}",
additional,
acquired,
self.reserved(),
)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we fail to get the required number and return error, I think we should notify the JVM memory manager to release the allocated acquired bytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will add.

use crate::jvm_bridge::get_global_jclass;

/// A DataFusion `MemoryPool` implementation for Comet, which delegate to the JVM
/// side `CometTaskMemoryManager`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment looks not correct as CometMemoryPool is the one implementing DataFusion MemoryPool. Maybe this should be moved to CometMemoryPool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let me move it.

Comment on lines +45 to +49
// Called by Comet native through JNI.
// Returns the actual amount of memory (in bytes) granted.
public long acquireMemory(long size) {
return internal.acquireExecutionMemory(size, nativeMemoryConsumer);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we claim Send and Sync for CometMemoryPool, should we make this synchronized method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TaskMemoryManager is already synchronized on the acquireExecutionMemory and releaseExecutionMemory, so it doesn't seem necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay, then it is fine.

@@ -87,10 +89,11 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now replaced by

conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")

below

Comment on lines -82 to -83
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For broadcast join threshold configs, should we keep them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I accidentally removed these when rebasing. Let me add them back.

@sunchao sunchao force-pushed the memory-manager-comet branch from 54db52b to dd67291 Compare March 4, 2024 19:24
@viirya
Copy link
Member

viirya commented Mar 5, 2024

@sunchao Do you see we use less memory (e.g., no more extra memory overhead needed) than before this patch when running TPCH/TPCDS queries on EKS?

@sunchao
Copy link
Member Author

sunchao commented Mar 6, 2024

I tried this with columnar shuffle sometime back and in TPC-DS there were other issues that caused OOM. I plan to try benchmarking this again soon. Let me merge this PR first and address remaining issues in follow-ups.

@sunchao sunchao merged commit e83635a into apache:main Mar 6, 2024
19 checks passed
@sunchao
Copy link
Member Author

sunchao commented Mar 6, 2024

Thanks, merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce unified memory manager
4 participants