From 460ba7f4d1a86fe9fafc354d0d31e52b4f307785 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 2 Jan 2024 12:47:20 -0800 Subject: [PATCH 1/6] feat: Introduce `CometTaskMemoryManager` and native side memory pool --- .../apache/comet/CometOutOfMemoryError.java | 8 ++ core/src/errors.rs | 6 +- core/src/execution/jni_api.rs | 42 ++++++--- core/src/execution/memory_pool.rs | 85 +++++++++++++++++++ core/src/execution/mod.rs | 3 + .../jvm_bridge/comet_task_memory_manager.rs | 45 ++++++++++ core/src/jvm_bridge/mod.rs | 8 +- dev/ensure-jars-have-correct-contents.sh | 2 + .../apache/spark/CometTaskMemoryManager.java | 49 +++++++++++ .../org/apache/comet/CometExecIterator.scala | 14 ++- .../main/scala/org/apache/comet/Native.scala | 7 +- .../exec/CometColumnarShuffleSuite.scala | 3 +- .../spark/sql/CometTPCDSQuerySuite.scala | 5 +- .../spark/sql/CometTPCHQuerySuite.scala | 5 +- .../org/apache/spark/sql/CometTestBase.scala | 29 ++----- 15 files changed, 270 insertions(+), 41 deletions(-) create mode 100644 common/src/main/java/org/apache/comet/CometOutOfMemoryError.java create mode 100644 core/src/execution/memory_pool.rs create mode 100644 core/src/jvm_bridge/comet_task_memory_manager.rs create mode 100644 spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java diff --git a/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java new file mode 100644 index 000000000..00d818952 --- /dev/null +++ b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java @@ -0,0 +1,8 @@ +package org.apache.comet; + +/** OOM error specific for Comet memory management */ +public class CometOutOfMemoryError extends OutOfMemoryError { + public CometOutOfMemoryError(String msg) { + super(msg); + } +} diff --git a/core/src/errors.rs b/core/src/errors.rs index e99af7aa6..1d5766cb9 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -101,7 +101,11 @@ pub enum CometError { #[from] source: std::num::ParseFloatError, }, - + #[error(transparent)] + BoolFormat { + #[from] + source: std::str::ParseBoolError, + }, #[error(transparent)] Format { #[from] diff --git a/core/src/execution/jni_api.rs b/core/src/execution/jni_api.rs index 1d55d3f92..2906f1d02 100644 --- a/core/src/execution/jni_api.rs +++ b/core/src/execution/jni_api.rs @@ -42,7 +42,7 @@ use jni::{ }; use std::{collections::HashMap, sync::Arc, task::Poll}; -use super::{serde, utils::SparkArrowConvert}; +use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, @@ -103,6 +103,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( iterators: jobjectArray, serialized_query: jbyteArray, metrics_node: JObject, + task_memory_manager_obj: JObject, ) -> jlong { try_unwrap_or_throw(&e, |mut env| { // Init JVM classes @@ -147,11 +148,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let input_source = Arc::new(jni_new_global_ref!(env, input_source)?); input_sources.push(input_source); } + let task_memory_manager = Arc::new(jni_new_global_ref!(env, task_memory_manager_obj)?); // We need to keep the session context alive. Some session state like temporary // dictionaries are stored in session context. If it is dropped, the temporary // dictionaries will be dropped as well. - let session = prepare_datafusion_session_context(&configs)?; + let session = prepare_datafusion_session_context(&configs, task_memory_manager)?; let exec_context = Box::new(ExecutionContext { id, @@ -175,6 +177,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( /// Parse Comet configs and configure DataFusion session context. fn prepare_datafusion_session_context( conf: &HashMap, + task_memory_manager: Arc, ) -> CometResult { // Get the batch size from Comet JVM side let batch_size = conf @@ -186,18 +189,29 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs); - // Set up memory limit if specified - if conf.contains_key("memory_limit") { - let memory_limit = conf.get("memory_limit").unwrap().parse::()?; - - let memory_fraction = conf - .get("memory_fraction") - .ok_or(CometError::Internal( - "Config 'memory_fraction' is not specified from Comet JVM side".to_string(), - ))? - .parse::()?; - - rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction); + let use_unified_memory_manager = conf + .get("use_unified_memory_manager") + .ok_or(CometError::Internal( + "Config 'use_unified_memory_manager' is not specified from Comet JVM side".to_string(), + ))? + .parse::()?; + + if use_unified_memory_manager { + // Set Comet memory pool for native + let memory_pool = CometMemoryPool::new(task_memory_manager); + rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); + } else { + // Use the memory pool from DF + if conf.contains_key("memory_limit") { + let memory_limit = conf.get("memory_limit").unwrap().parse::()?; + let memory_fraction = conf + .get("memory_fraction") + .ok_or(CometError::Internal( + "Config 'memory_fraction' is not specified from Comet JVM side".to_string(), + ))? + .parse::()?; + rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction) + } } // Get Datafusion configuration from Spark Execution context diff --git a/core/src/execution/memory_pool.rs b/core/src/execution/memory_pool.rs new file mode 100644 index 000000000..a07bba58e --- /dev/null +++ b/core/src/execution/memory_pool.rs @@ -0,0 +1,85 @@ +use std::{ + fmt::{Debug, Formatter, Result as FmtResult}, + sync::{ + atomic::{AtomicUsize, Ordering::Relaxed}, + Arc, + }, +}; + +use jni::objects::GlobalRef; + +use datafusion::{ + common::DataFusionError, + execution::memory_pool::{MemoryPool, MemoryReservation}, +}; + +use crate::jvm_bridge::{jni_call, JVMClasses}; + +pub struct CometMemoryPool { + task_memory_manager_handle: Arc, + used: AtomicUsize, +} + +impl Debug for CometMemoryPool { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + f.debug_struct("CometMemoryPool") + .field("used", &self.used.load(Relaxed)) + .finish() + } +} + +impl CometMemoryPool { + pub fn new(task_memory_manager_handle: Arc) -> CometMemoryPool { + Self { + task_memory_manager_handle, + used: AtomicUsize::new(0), + } + } +} + +unsafe impl Send for CometMemoryPool {} +unsafe impl Sync for CometMemoryPool {} + +impl MemoryPool for CometMemoryPool { + fn grow(&self, _: &MemoryReservation, additional: usize) { + self.used.fetch_add(additional, Relaxed); + } + + fn shrink(&self, _: &MemoryReservation, size: usize) { + let mut env = JVMClasses::get_env(); + let handle = self.task_memory_manager_handle.as_obj(); + unsafe { + jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ()) + .unwrap(); + } + self.used.fetch_sub(size, Relaxed); + } + + fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(), DataFusionError> { + if additional > 0 { + let mut env = JVMClasses::get_env(); + let handle = self.task_memory_manager_handle.as_obj(); + unsafe { + let acquired = jni_call!(&mut env, + comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64)?; + + // If the number of bytes we acquired is less than the requested, return an error, + // and hopefully will trigger spilling from the caller side. + if acquired < additional as i64 { + return Err(DataFusionError::Execution(format!( + "Failed to acquire {} bytes, only got {}. Reserved: {}", + additional, + acquired, + self.reserved(), + ))); + } + } + self.used.fetch_add(additional, Relaxed); + } + Ok(()) + } + + fn reserved(&self) -> usize { + self.used.load(Relaxed) + } +} diff --git a/core/src/execution/mod.rs b/core/src/execution/mod.rs index 4c57ad8eb..b3be83b5f 100644 --- a/core/src/execution/mod.rs +++ b/core/src/execution/mod.rs @@ -29,6 +29,9 @@ pub(crate) mod sort; mod timezone; pub(crate) mod utils; +mod memory_pool; +pub use memory_pool::*; + // Include generated modules from .proto files. #[allow(missing_docs)] pub mod spark_expression { diff --git a/core/src/jvm_bridge/comet_task_memory_manager.rs b/core/src/jvm_bridge/comet_task_memory_manager.rs new file mode 100644 index 000000000..1cc1488f9 --- /dev/null +++ b/core/src/jvm_bridge/comet_task_memory_manager.rs @@ -0,0 +1,45 @@ +use jni::{ + errors::Result as JniResult, + objects::{JClass, JMethodID}, + signature::{Primitive, ReturnType}, + JNIEnv, +}; + +use crate::jvm_bridge::get_global_jclass; + +/// A DataFusion `MemoryPool` implementation for Comet, which delegate to the JVM +/// side `CometTaskMemoryManager`. +#[derive(Debug)] +pub struct CometTaskMemoryManager<'a> { + pub class: JClass<'a>, + pub method_acquire_memory: JMethodID, + pub method_release_memory: JMethodID, + + pub method_acquire_memory_ret: ReturnType, + pub method_release_memory_ret: ReturnType, +} + +impl<'a> CometTaskMemoryManager<'a> { + pub const JVM_CLASS: &'static str = "org/apache/spark/CometTaskMemoryManager"; + + pub fn new(env: &mut JNIEnv<'a>) -> JniResult> { + let class = get_global_jclass(env, Self::JVM_CLASS)?; + + let result = CometTaskMemoryManager { + class, + method_acquire_memory: env.get_method_id( + Self::JVM_CLASS, + "acquireMemory", + "(J)J".to_string(), + )?, + method_release_memory: env.get_method_id( + Self::JVM_CLASS, + "releaseMemory", + "(J)V".to_string(), + )?, + method_acquire_memory_ret: ReturnType::Primitive(Primitive::Long), + method_release_memory_ret: ReturnType::Primitive(Primitive::Void), + }; + Ok(result) + } +} diff --git a/core/src/jvm_bridge/mod.rs b/core/src/jvm_bridge/mod.rs index 7a2882e30..41376f03b 100644 --- a/core/src/jvm_bridge/mod.rs +++ b/core/src/jvm_bridge/mod.rs @@ -73,7 +73,7 @@ macro_rules! jni_call { let ret = $env.call_method_unchecked($obj, method_id, ret_type, args); // Check if JVM has thrown any exception, and handle it if so. - let result = if let Some(exception) = $crate::jvm_bridge::check_exception($env)? { + let result = if let Some(exception) = $crate::jvm_bridge::check_exception($env).unwrap() { Err(exception.into()) } else { $crate::jvm_bridge::jni_map_error!($env, ret) @@ -194,10 +194,12 @@ mod comet_exec; pub use comet_exec::*; mod batch_iterator; mod comet_metric_node; +mod comet_task_memory_manager; use crate::{errors::CometError, JAVA_VM}; use batch_iterator::CometBatchIterator; pub use comet_metric_node::*; +pub use comet_task_memory_manager::*; /// The JVM classes that are used in the JNI calls. pub struct JVMClasses<'a> { @@ -216,6 +218,9 @@ pub struct JVMClasses<'a> { pub comet_exec: CometExec<'a>, /// The CometBatchIterator class. Used for iterating over the batches. pub comet_batch_iterator: CometBatchIterator<'a>, + /// The CometTaskMemoryManager used for interacting with JVM side to + /// acquire & release native memory. + pub comet_task_memory_manager: CometTaskMemoryManager<'a>, } unsafe impl<'a> Send for JVMClasses<'a> {} @@ -261,6 +266,7 @@ impl JVMClasses<'_> { comet_metric_node: CometMetricNode::new(env).unwrap(), comet_exec: CometExec::new(env).unwrap(), comet_batch_iterator: CometBatchIterator::new(env).unwrap(), + comet_task_memory_manager: CometTaskMemoryManager::new(env).unwrap(), } }); } diff --git a/dev/ensure-jars-have-correct-contents.sh b/dev/ensure-jars-have-correct-contents.sh index 1ab09a5f8..5543093ff 100755 --- a/dev/ensure-jars-have-correct-contents.sh +++ b/dev/ensure-jars-have-correct-contents.sh @@ -80,6 +80,8 @@ allowed_expr+="|^org/apache/spark/shuffle/comet/.*$" allowed_expr+="|^org/apache/spark/sql/$" allowed_expr+="|^org/apache/spark/CometPlugin.class$" allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$" +allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$" +allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$" allowed_expr+=")" declare -i bad_artifacts=0 diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java new file mode 100644 index 000000000..c1b68de61 --- /dev/null +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -0,0 +1,49 @@ +package org.apache.spark; + +import java.io.IOException; + +import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.memory.TaskMemoryManager; + +/** + * A adapter class that is used by Comet native to acquire & release memory through Spark's unified + * memory manager. This assumes Spark's off-heap memory mode is enabled. + */ +public class CometTaskMemoryManager { + private final TaskMemoryManager internal; + private final NativeMemoryConsumer nativeMemoryConsumer; + + public CometTaskMemoryManager() { + this.internal = TaskContext$.MODULE$.get().taskMemoryManager(); + this.nativeMemoryConsumer = new NativeMemoryConsumer(); + } + + // Called by Comet native through JNI. + // Returns the actual amount of memory (in bytes) granted. + public long acquireMemory(long size) { + return internal.acquireExecutionMemory(size, nativeMemoryConsumer); + } + + // Called by Comet native through JNI + public void releaseMemory(long size) { + internal.releaseExecutionMemory(size, nativeMemoryConsumer); + } + + /** + * A dummy memory consumer that does nothing when spilling. At the moment, Comet native doesn't + * share the same API as Spark and cannot trigger spill when acquire memory. Therefore, when + * acquiring memory from native or JVM, spilling can only be triggered from JVM operators. + */ + private class NativeMemoryConsumer extends MemoryConsumer { + protected NativeMemoryConsumer() { + super(CometTaskMemoryManager.this.internal, 0, MemoryMode.OFF_HEAP); + } + + @Override + public long spill(long size, MemoryConsumer trigger) throws IOException { + // No spilling + return 0; + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 20b2d384a..2d2a9976c 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -54,7 +54,13 @@ class CometExecIterator( }.toArray private val plan = { val configs = createNativeConf - nativeLib.createPlan(id, configs, cometBatchIterators, protobufQueryPlan, nativeMetrics) + nativeLib.createPlan( + id, + configs, + cometBatchIterators, + protobufQueryPlan, + nativeMetrics, + new CometTaskMemoryManager) } private var nextBatch: Option[ColumnarBatch] = None @@ -83,6 +89,12 @@ class CometExecIterator( val conf = SparkEnv.get.conf val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) + // Only enable unified memory manager when off-heap mode is enabled. Otherwise, + // we'll use the built-in memory pool from DF, and initializes with `memory_limit` + // and `memory_fraction` below. + result.put( + "use_unified_memory_manager", + String.valueOf(conf.get("spark.memory.offHeap.enabled", "false"))) result.put("memory_limit", String.valueOf(maxMemory)) result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 1564fc991..97ded91b2 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -21,6 +21,7 @@ package org.apache.comet import java.util.Map +import org.apache.spark.CometTaskMemoryManager import org.apache.spark.sql.comet.CometMetricNode class Native extends NativeBase { @@ -38,6 +39,9 @@ class Native extends NativeBase { * the bytes of serialized SparkPlan. * @param metrics * the native metrics of SparkPlan. + * @param taskMemoryManager + * the task-level memory manager that is responsible for tracking memory usage across JVM and + * native side. * @return * the address to native query plan. */ @@ -46,7 +50,8 @@ class Native extends NativeBase { configMap: Map[String, String], iterators: Array[CometBatchIterator], plan: Array[Byte], - metrics: CometMetricNode): Long + metrics: CometMetricNode, + taskMemoryManager: CometTaskMemoryManager): Long /** * Execute a native query plan based on given input Arrow arrays. diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index fec6197d6..a9b29e6b7 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -53,7 +53,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") { testFun } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 65ea8ba64..ac0cc900e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql import org.apache.spark.SparkConf +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} +import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.comet.CometConf @@ -150,10 +152,11 @@ class CometTPCDSQuerySuite "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") - conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") conf } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index 9ea0218c2..b8cd11d4a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -25,9 +25,11 @@ import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} import org.apache.comet.CometConf @@ -87,10 +89,11 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") - conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") } protected override def createSparkSession: TestSparkSession = { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 38a8d7d2f..a1fb19898 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -23,9 +23,7 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import org.scalactic.source.Position import org.scalatest.BeforeAndAfterEach -import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.parquet.column.ParquetProperties @@ -35,6 +33,7 @@ import org.apache.parquet.hadoop.ParquetWriter import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark._ +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} import org.apache.spark.sql.comet.{CometBatchScanExec, CometBroadcastExchangeExec, CometExec, CometScanExec, CometScanWrapper, CometSinkPlaceHolder} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, InputAdapter, SparkPlan, WholeStageCodegenExec} @@ -65,28 +64,18 @@ abstract class CometTestBase val conf = new SparkConf() conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) conf.set(SQLConf.SHUFFLE_PARTITIONS, 10) // reduce parallelism in tests - conf.set("spark.shuffle.manager", shuffleManager) + conf.set(SQLConf.ANSI_ENABLED.key, "false") + conf.set(SHUFFLE_MANAGER, shuffleManager) + conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") + conf.set(CometConf.COMET_ENABLED.key, "true") + conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") + conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") + conf.set(CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf } - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - super.test(testName, testTags: _*) { - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "2g", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g", - SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "1g", - SQLConf.ANSI_ENABLED.key -> "false") { - testFun - } - } - } - /** * A helper function for comparing Comet DataFrame with Spark result using absolute tolerance. */ From 808214a8725a66d5d4a2cd899f7eb3331e8445ec Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 21 Feb 2024 17:20:56 -0800 Subject: [PATCH 2/6] add license header --- core/src/execution/memory_pool.rs | 17 +++++++++++++++++ .../src/jvm_bridge/comet_task_memory_manager.rs | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/core/src/execution/memory_pool.rs b/core/src/execution/memory_pool.rs index a07bba58e..f347e173c 100644 --- a/core/src/execution/memory_pool.rs +++ b/core/src/execution/memory_pool.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{ fmt::{Debug, Formatter, Result as FmtResult}, sync::{ diff --git a/core/src/jvm_bridge/comet_task_memory_manager.rs b/core/src/jvm_bridge/comet_task_memory_manager.rs index 1cc1488f9..1f9ad07aa 100644 --- a/core/src/jvm_bridge/comet_task_memory_manager.rs +++ b/core/src/jvm_bridge/comet_task_memory_manager.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use jni::{ errors::Result as JniResult, objects::{JClass, JMethodID}, From 2ad7aca268817c65bcc284939f60a0f93e4810d7 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 22 Feb 2024 11:36:15 -0800 Subject: [PATCH 3/6] add license header --- .../org/apache/comet/CometOutOfMemoryError.java | 17 +++++++++++++++++ .../apache/spark/CometTaskMemoryManager.java | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java index 00d818952..31121a48e 100644 --- a/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java +++ b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.comet; /** OOM error specific for Comet memory management */ diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index c1b68de61..120434c04 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark; import java.io.IOException; From a8ce1a98249d700c559d993fc6d77256fab12e52 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 28 Feb 2024 10:31:30 -0800 Subject: [PATCH 4/6] comments --- core/src/execution/jni_api.rs | 16 +++++++++------- .../org/apache/spark/CometTaskMemoryManager.java | 11 ++++++++++- .../org/apache/comet/CometExecIterator.scala | 2 +- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/core/src/execution/jni_api.rs b/core/src/execution/jni_api.rs index 2906f1d02..20f98a3a4 100644 --- a/core/src/execution/jni_api.rs +++ b/core/src/execution/jni_api.rs @@ -103,7 +103,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( iterators: jobjectArray, serialized_query: jbyteArray, metrics_node: JObject, - task_memory_manager_obj: JObject, + comet_task_memory_manager_obj: JObject, ) -> jlong { try_unwrap_or_throw(&e, |mut env| { // Init JVM classes @@ -148,7 +148,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let input_source = Arc::new(jni_new_global_ref!(env, input_source)?); input_sources.push(input_source); } - let task_memory_manager = Arc::new(jni_new_global_ref!(env, task_memory_manager_obj)?); + let task_memory_manager = + Arc::new(jni_new_global_ref!(env, comet_task_memory_manager_obj)?); // We need to keep the session context alive. Some session state like temporary // dictionaries are stored in session context. If it is dropped, the temporary @@ -177,7 +178,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( /// Parse Comet configs and configure DataFusion session context. fn prepare_datafusion_session_context( conf: &HashMap, - task_memory_manager: Arc, + comet_task_memory_manager: Arc, ) -> CometResult { // Get the batch size from Comet JVM side let batch_size = conf @@ -189,16 +190,17 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs); + // Check if we are using unified memory manager integrated with Spark. Default to false if not + // set. let use_unified_memory_manager = conf .get("use_unified_memory_manager") - .ok_or(CometError::Internal( - "Config 'use_unified_memory_manager' is not specified from Comet JVM side".to_string(), - ))? + .map(String::as_str) + .unwrap_or("false") .parse::()?; if use_unified_memory_manager { // Set Comet memory pool for native - let memory_pool = CometMemoryPool::new(task_memory_manager); + let memory_pool = CometMemoryPool::new(comet_task_memory_manager); rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); } else { // Use the memory pool from DF diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index 120434c04..1933e4857 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -28,10 +28,14 @@ * memory manager. This assumes Spark's off-heap memory mode is enabled. */ public class CometTaskMemoryManager { + /** The id uniquely identifies the native plan this memory manager is associated to */ + private final long id; + private final TaskMemoryManager internal; private final NativeMemoryConsumer nativeMemoryConsumer; - public CometTaskMemoryManager() { + public CometTaskMemoryManager(long id) { + this.id = id; this.internal = TaskContext$.MODULE$.get().taskMemoryManager(); this.nativeMemoryConsumer = new NativeMemoryConsumer(); } @@ -62,5 +66,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { // No spilling return 0; } + + @Override + public String toString() { + return String.format("NativeMemoryConsumer(id=%)", id); + } } } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 2d2a9976c..b3604c9e0 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -60,7 +60,7 @@ class CometExecIterator( cometBatchIterators, protobufQueryPlan, nativeMetrics, - new CometTaskMemoryManager) + new CometTaskMemoryManager(id)) } private var nextBatch: Option[ColumnarBatch] = None From c2ce5361d64cfdaf58276775e34f0d41d2215b16 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 28 Feb 2024 10:53:18 -0800 Subject: [PATCH 5/6] fix format --- .../apache/comet/CometOutOfMemoryError.java | 26 ++++++++++--------- .../apache/spark/CometTaskMemoryManager.java | 26 ++++++++++--------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java index 31121a48e..8a9e8d1db 100644 --- a/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java +++ b/common/src/main/java/org/apache/comet/CometOutOfMemoryError.java @@ -1,18 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.comet; diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index 1933e4857..96fa3b432 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -1,18 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.spark; From dd6729179a3b3d8c6d29e17cb47c80565c16af70 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 4 Mar 2024 10:56:34 -0800 Subject: [PATCH 6/6] comments --- core/src/execution/memory_pool.rs | 61 ++++++++++++------- .../jvm_bridge/comet_task_memory_manager.rs | 4 +- .../spark/sql/CometTPCDSQuerySuite.scala | 3 +- .../spark/sql/CometTPCHQuerySuite.scala | 3 +- .../org/apache/spark/sql/CometTestBase.scala | 2 + 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/core/src/execution/memory_pool.rs b/core/src/execution/memory_pool.rs index f347e173c..ff2369095 100644 --- a/core/src/execution/memory_pool.rs +++ b/core/src/execution/memory_pool.rs @@ -30,8 +30,13 @@ use datafusion::{ execution::memory_pool::{MemoryPool, MemoryReservation}, }; -use crate::jvm_bridge::{jni_call, JVMClasses}; +use crate::{ + errors::CometResult, + jvm_bridge::{jni_call, JVMClasses}, +}; +/// A DataFusion `MemoryPool` implementation for Comet. Internally this is +/// implemented via delegating calls to [`crate::jvm_bridge::CometTaskMemoryManager`]. pub struct CometMemoryPool { task_memory_manager_handle: Arc, used: AtomicUsize, @@ -52,6 +57,23 @@ impl CometMemoryPool { used: AtomicUsize::new(0), } } + + fn acquire(&self, additional: usize) -> CometResult { + let mut env = JVMClasses::get_env(); + let handle = self.task_memory_manager_handle.as_obj(); + unsafe { + jni_call!(&mut env, + comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64) + } + } + + fn release(&self, size: usize) -> CometResult<()> { + let mut env = JVMClasses::get_env(); + let handle = self.task_memory_manager_handle.as_obj(); + unsafe { + jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ()) + } + } } unsafe impl Send for CometMemoryPool {} @@ -59,37 +81,32 @@ unsafe impl Sync for CometMemoryPool {} impl MemoryPool for CometMemoryPool { fn grow(&self, _: &MemoryReservation, additional: usize) { + self.acquire(additional) + .unwrap_or_else(|_| panic!("Failed to acquire {} bytes", additional)); self.used.fetch_add(additional, Relaxed); } fn shrink(&self, _: &MemoryReservation, size: usize) { - let mut env = JVMClasses::get_env(); - let handle = self.task_memory_manager_handle.as_obj(); - unsafe { - jni_call!(&mut env, comet_task_memory_manager(handle).release_memory(size as i64) -> ()) - .unwrap(); - } + self.release(size) + .unwrap_or_else(|_| panic!("Failed to release {} bytes", size)); self.used.fetch_sub(size, Relaxed); } fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(), DataFusionError> { if additional > 0 { - let mut env = JVMClasses::get_env(); - let handle = self.task_memory_manager_handle.as_obj(); - unsafe { - let acquired = jni_call!(&mut env, - comet_task_memory_manager(handle).acquire_memory(additional as i64) -> i64)?; + let acquired = self.acquire(additional)?; + // If the number of bytes we acquired is less than the requested, return an error, + // and hopefully will trigger spilling from the caller side. + if acquired < additional as i64 { + // Release the acquired bytes before throwing error + self.release(acquired as usize)?; - // If the number of bytes we acquired is less than the requested, return an error, - // and hopefully will trigger spilling from the caller side. - if acquired < additional as i64 { - return Err(DataFusionError::Execution(format!( - "Failed to acquire {} bytes, only got {}. Reserved: {}", - additional, - acquired, - self.reserved(), - ))); - } + return Err(DataFusionError::Execution(format!( + "Failed to acquire {} bytes, only got {}. Reserved: {}", + additional, + acquired, + self.reserved(), + ))); } self.used.fetch_add(additional, Relaxed); } diff --git a/core/src/jvm_bridge/comet_task_memory_manager.rs b/core/src/jvm_bridge/comet_task_memory_manager.rs index 1f9ad07aa..a79a5b67d 100644 --- a/core/src/jvm_bridge/comet_task_memory_manager.rs +++ b/core/src/jvm_bridge/comet_task_memory_manager.rs @@ -24,8 +24,8 @@ use jni::{ use crate::jvm_bridge::get_global_jclass; -/// A DataFusion `MemoryPool` implementation for Comet, which delegate to the JVM -/// side `CometTaskMemoryManager`. +/// A wrapper which delegate acquire/release memory calls to the +/// JVM side `CometTaskMemoryManager`. #[derive(Debug)] pub struct CometTaskMemoryManager<'a> { pub class: JClass<'a>, diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index ac0cc900e..265235ffe 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql import org.apache.spark.SparkConf -import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} -import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE} import org.apache.comet.CometConf diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index b8cd11d4a..954269a8a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -25,11 +25,10 @@ import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER} +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession} import org.apache.comet.CometConf diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index a1fb19898..e31be8c28 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -68,6 +68,8 @@ abstract class CometTestBase conf.set(SHUFFLE_MANAGER, shuffleManager) conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") + conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") + conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")