From 655081b66808d8f70abebd4d85af7c401c579a3a Mon Sep 17 00:00:00 2001 From: KAZUYUKI TANIMURA Date: Wed, 18 Dec 2024 09:10:34 -0800 Subject: [PATCH] test: enabling Spark tests with offHeap requirement (#1177) ## Which issue does this PR close? ## Rationale for this change After https://github.com/apache/datafusion-comet/pull/1062 We have not running Spark tests for native execution ## What changes are included in this PR? Removed the off heap requirement for testing ## How are these changes tested? Bringing back Spark tests for native execution --- dev/diffs/4.0.0-preview1.diff | 103 +++++++++++++++++- native/core/src/execution/jni_api.rs | 26 ++++- .../org/apache/comet/CometExecIterator.scala | 9 +- .../comet/CometSparkSessionExtensions.scala | 10 +- .../main/scala/org/apache/comet/Native.scala | 5 + 5 files changed, 142 insertions(+), 11 deletions(-) diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index ba68d2a7b..db62ed607 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -146,6 +146,77 @@ index 698ca009b4f..57d774a3617 100644 -- Test tables CREATE table explain_temp1 (key int, val int) USING PARQUET; +diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql +index 3a409eea348..26e9aaf215c 100644 +--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql ++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql +@@ -6,6 +6,9 @@ + -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql + -- + ++-- TODO: https://github.com/apache/datafusion-comet/issues/551 ++--SET spark.comet.enabled = false ++ + CREATE TABLE INT4_TBL(f1 int) USING parquet; + + -- [SPARK-28023] Trim the string when cast string type to other types +diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql +index fac23b4a26f..98b12ae5ccc 100644 +--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql ++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql +@@ -6,6 +6,10 @@ + -- Test int8 64-bit integers. + -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql + -- ++ ++-- TODO: https://github.com/apache/datafusion-comet/issues/551 ++--SET spark.comet.enabled = false ++ + CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet; + + -- PostgreSQL implicitly casts string literals to data with integral types, but +diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql +index 0efe0877e9b..f9df0400c99 100644 +--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql ++++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql +@@ -6,6 +6,9 @@ + -- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql + -- + ++-- TODO: https://github.com/apache/datafusion-comet/issues/551 ++--SET spark.comet.enabled = false ++ + -- load test data + CREATE TABLE test_having (a int, b int, c string, d string) USING parquet; + INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A'); +diff --git a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql +index e803254ea64..74db78aee38 100644 +--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql ++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql +@@ -1,6 +1,9 @@ + -- This test suits check the spark.sql.viewSchemaBindingMode configuration. + -- It can be DISABLED and COMPENSATION + ++-- TODO: https://github.com/apache/datafusion-comet/issues/551 ++--SET spark.comet.enabled = false ++ + -- Verify the default binding is true + SET spark.sql.legacy.viewSchemaBindingMode; + +diff --git a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql +index 21a3ce1e122..f4762ab98f0 100644 +--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql ++++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql +@@ -1,5 +1,9 @@ + -- This test suite checks the WITH SCHEMA COMPENSATION clause + -- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS ++ ++-- TODO: https://github.com/apache/datafusion-comet/issues/551 ++--SET spark.comet.enabled = false ++ + SET spark.sql.ansi.enabled = false; + + -- In COMPENSATION views get invalidated if the type can't cast diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index d023fb82185..0f4f03bda6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -917,7 +988,7 @@ index 34c6c49bc49..f5dea07a213 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 56c364e2084..a00a50e020a 100644 +index 56c364e2084..fc3abd7cdc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1510,7 +1510,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -930,6 +1001,36 @@ index 56c364e2084..a00a50e020a 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -4454,7 +4455,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + + test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" + +- " when WSCG is off") { ++ " when WSCG is off", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t") { +@@ -4475,7 +4477,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + + test("SPARK-39175: Query context of Cast should be serialized to executors" + +- " when WSCG is off") { ++ " when WSCG is off", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t") { +@@ -4502,7 +4505,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + + test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " + +- "be serialized to executors when WSCG is off") { ++ "be serialized to executors when WSCG is off", ++ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 68f14f13bbd..174636cefb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 491b389c9..eb73675b5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -106,6 +106,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( metrics_node: JObject, comet_task_memory_manager_obj: JObject, batch_size: jint, + use_unified_memory_manager: jboolean, + memory_limit: jlong, + memory_fraction: jdouble, debug_native: jboolean, explain_native: jboolean, worker_threads: jint, @@ -147,7 +150,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( // We need to keep the session context alive. Some session state like temporary // dictionaries are stored in session context. If it is dropped, the temporary // dictionaries will be dropped as well. - let session = prepare_datafusion_session_context(batch_size as usize, task_memory_manager)?; + let session = prepare_datafusion_session_context( + batch_size as usize, + use_unified_memory_manager == 1, + memory_limit as usize, + memory_fraction, + task_memory_manager, + )?; let plan_creation_time = start.elapsed(); @@ -174,13 +183,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( /// Configure DataFusion session context. fn prepare_datafusion_session_context( batch_size: usize, + use_unified_memory_manager: bool, + memory_limit: usize, + memory_fraction: f64, comet_task_memory_manager: Arc, ) -> CometResult { let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs); - // Set Comet memory pool for native - let memory_pool = CometMemoryPool::new(comet_task_memory_manager); - rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); + // Check if we are using unified memory manager integrated with Spark. + if use_unified_memory_manager { + // Set Comet memory pool for native + let memory_pool = CometMemoryPool::new(comet_task_memory_manager); + rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); + } else { + // Use the memory pool from DF + rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction) + } // Get Datafusion configuration from Spark Execution context // can be configured in Comet Spark JVM using Spark --conf parameters diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index d57e9e2b8..04d930695 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -60,6 +60,10 @@ class CometExecIterator( new CometBatchIterator(iterator, nativeUtil) }.toArray private val plan = { + val conf = SparkEnv.get.conf + // Only enable unified memory manager when off-heap mode is enabled. Otherwise, + // we'll use the built-in memory pool from DF, and initializes with `memory_limit` + // and `memory_fraction` below. nativeLib.createPlan( id, cometBatchIterators, @@ -67,6 +71,9 @@ class CometExecIterator( nativeMetrics, new CometTaskMemoryManager(id), batchSize = COMET_BATCH_SIZE.get(), + use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false), + memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf), + memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(), debug = COMET_DEBUG_ENABLED.get(), explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), workerThreads = COMET_WORKER_THREADS.get(), diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 61c45daff..8bff6b5fb 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.rules.RewriteJoin import org.apache.comet.serde.OperatorOuterClass.Operator @@ -921,8 +921,9 @@ class CometSparkSessionExtensions override def apply(plan: SparkPlan): SparkPlan = { // Comet required off-heap memory to be enabled - if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) { - logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false") + if (!isOffHeapEnabled(conf) && !isTesting) { + logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false") + withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false") return plan } @@ -1174,8 +1175,7 @@ object CometSparkSessionExtensions extends Logging { } private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean = - conf.contains("spark.memory.offHeap.enabled") && - conf.getConfString("spark.memory.offHeap.enabled").toBoolean + conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean // Copied from org.apache.spark.util.Utils which is private to Spark. private[comet] def isTesting: Boolean = { diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 64ada91ad..083c0f2b5 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -43,6 +43,7 @@ class Native extends NativeBase { * @return * the address to native query plan. */ + // scalastyle:off @native def createPlan( id: Long, iterators: Array[CometBatchIterator], @@ -50,10 +51,14 @@ class Native extends NativeBase { metrics: CometMetricNode, taskMemoryManager: CometTaskMemoryManager, batchSize: Int, + use_unified_memory_manager: Boolean, + memory_limit: Long, + memory_fraction: Double, debug: Boolean, explain: Boolean, workerThreads: Int, blockingThreads: Int): Long + // scalastyle:on /** * Execute a native query plan based on given input Arrow arrays.