-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Introduce CometTaskMemoryManager
and native side memory pool
#83
Conversation
76002b9
to
26f4802
Compare
cc @viirya |
Thanks @sunchao. I will review this in next days (or week). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, this is in great shape. Left some minor comments.
core/src/execution/jni_api.rs
Outdated
@@ -103,6 +103,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( | |||
iterators: jobjectArray, | |||
serialized_query: jbyteArray, | |||
metrics_node: JObject, | |||
task_memory_manager_obj: JObject, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about rename this to comet_task_memory_manager_obj
?
When I first read the code, I thought it was the Spark's TaskMemoryManager object. However it's comet's CometTaskMemoryManager
. It would be clear to call it comet_task_memory_manager_obj
Other occurrence could be renamed too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
core/src/execution/jni_api.rs
Outdated
.get("use_unified_memory_manager") | ||
.ok_or(CometError::Internal( | ||
"Config 'use_unified_memory_manager' is not specified from Comet JVM side".to_string(), | ||
))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe a more permissive way is to treat unsetting use_unified_memory_manager
as false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea sure, although this is an internal error from developer side if not set.
* share the same API as Spark and cannot trigger spill when acquire memory. Therefore, when | ||
* acquiring memory from native or JVM, spilling can only be triggered from JVM operators. | ||
*/ | ||
private class NativeMemoryConsumer extends MemoryConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer's toString might be used when the debugging log is turned on.
It would be great that we can override this class to provide toString
method and also add a unique flag/id to identify the corresponding consumer for the native plan/execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can add a toString
for now and we can figure out how to use it for debugging purpose later.
cometBatchIterators, | ||
protobufQueryPlan, | ||
nativeMetrics, | ||
new CometTaskMemoryManager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm referring this. I think we can pass id
to CometTaskMemoryManager
and use that for identity mark.
conf.set("spark.shuffle.manager", shuffleManager) | ||
conf.set(SQLConf.ANSI_ENABLED.key, "false") | ||
conf.set(SHUFFLE_MANAGER, shuffleManager) | ||
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we still going to test the default memory pool implementation in DataFusion?
Seems like all the test code path are the unified memory manager now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the long term I'm thinking to only use the memory pool defined in Comet. This currently requires users to turn on off-heap mode in Spark and set the off-heap memory accordingly, so configuration changes are necessary when they want to use Comet. Ideally we should be able to use DriverPlugin
to override the memory settings so Comet may just work out of box (need to change Spark in a few places).
The default memory manager path is kept only for now until we are able to do the override through DriverPlugin
. Internally we still run all the Spark SQL tests using the default memory manager, and can probably do the same here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should be able to use DriverPlugin to override the memory settings so Comet may just work out of box (need to change Spark in a few places).
Oh. It seems that DriverPlugin is initialized before task scheduler. Which places in Spark do we need for CometDriverPlugin to override memory settings? The memory overhead is already override by comet.
and can probably do the same here too.
Is this still in this PR's scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. It seems that DriverPlugin is initialized before task scheduler. Which places in Spark do we need for CometDriverPlugin to override memory settings? The memory overhead is already override by comet.
I already made one change in Spark: apache/spark#45052 for this. We'll need a few more changes so we can completely overwrite executor memory setting through DriverPlugin
.
Is this still in this PR's scope?
Not really. Will do that in #8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already made one change in Spark: apache/spark#45052 for this. We'll need a few more changes so we can completely overwrite executor memory setting through DriverPlugin.
Great work. Looking forward that we can completely overwrite memory settings through DriverPlugin.
d58ab8d
to
0cfb2f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my side, except the conflict should be resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good catch on check_exception in jni_call.
core/src/execution/memory_pool.rs
Outdated
fn grow(&self, _: &MemoryReservation, additional: usize) { | ||
self.used.fetch_add(additional, Relaxed); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to update (acquire) the required memory from JVM memory manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think grow
is not really used by DataFusion except in tests, that's why I didn't do it. But you are right, it's better to add it too for future proof.
core/src/execution/memory_pool.rs
Outdated
if acquired < additional as i64 { | ||
return Err(DataFusionError::Execution(format!( | ||
"Failed to acquire {} bytes, only got {}. Reserved: {}", | ||
additional, | ||
acquired, | ||
self.reserved(), | ||
))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we fail to get the required number and return error, I think we should notify the JVM memory manager to release the allocated acquired
bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Will add.
use crate::jvm_bridge::get_global_jclass; | ||
|
||
/// A DataFusion `MemoryPool` implementation for Comet, which delegate to the JVM | ||
/// side `CometTaskMemoryManager`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment looks not correct as CometMemoryPool
is the one implementing DataFusion MemoryPool
. Maybe this should be moved to CometMemoryPool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea let me move it.
// Called by Comet native through JNI. | ||
// Returns the actual amount of memory (in bytes) granted. | ||
public long acquireMemory(long size) { | ||
return internal.acquireExecutionMemory(size, nativeMemoryConsumer); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we claim Send
and Sync
for CometMemoryPool
, should we make this synchronized method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think TaskMemoryManager
is already synchronized on the acquireExecutionMemory
and releaseExecutionMemory
, so it doesn't seem necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okay, then it is fine.
@@ -87,10 +89,11 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH | |||
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") | |||
conf.set(CometConf.COMET_ENABLED.key, "true") | |||
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") | |||
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now replaced by
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
below
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g", | ||
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For broadcast join threshold configs, should we keep them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I accidentally removed these when rebasing. Let me add them back.
54db52b
to
dd67291
Compare
@sunchao Do you see we use less memory (e.g., no more extra memory overhead needed) than before this patch when running TPCH/TPCDS queries on EKS? |
I tried this with columnar shuffle sometime back and in TPC-DS there were other issues that caused OOM. I plan to try benchmarking this again soon. Let me merge this PR first and address remaining issues in follow-ups. |
Thanks, merged |
Which issue does this PR close?
Closes #34.
Rationale for this change
Currently Comet uses the default memory pool implementation in DataFusion, which is not aware of the memory manager on the JVM Spark side. Therefore, in the case when a Spark job has both Spark and Comet operators, we'd need to initialize two memory pools separately for each of them, and make sure there is enough budget in them. In addition, since we cannot trigger spilling from native to JVM, or vise versa, the budget need to be large enough which means Comet typically will need to use more memory than Spark does.
Since Spark already has a
UnifiedMemoryManager
, this PR proposes to create a new memory pool implementation which delegate calls to the JVM sideUnifiedMemoryManager
, which serves as the source of truth and serves memory acquisition and release from both JVM and native side.What changes are included in this PR?
This PR introduces a
CometMemoryPool
class on the native side, overriding the default memory pool used by DF. This memory pool dispatches calls to Spark'sTaskMemoryManager
for acquiring and releasing memory.The newly added memory pool will only be activated when
spark.memory.offHeap.enabled
is set totrue
. Otherwise, the behavior remains the same as before (andspark.executor.memoryOverhead
need to be large enough for native execution). InTestBosonBase
,spark.memory.offHeap.enabled
is enabled so all the tests within Comet are tested with the new feature.How are these changes tested?
All the existing tests are updated to use the new memory manager implementation.