Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set concurrent tasks on batch size and GPU memory #12140

Open
wants to merge 2 commits into
base: branch-25.04
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Name | Description | Default Value | Applicable at
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk. Use -1 to set the amount to the combined size of pinned and pageable memory pools.|-1|Startup
<a name="memory.pinnedPool.size"></a>spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0|Startup
<a name="sql.batchSizeBytes"></a>spark.rapids.sql.batchSizeBytes|Set the target number of bytes for a GPU batch. Splits sizes for input data is covered by separate configs.|1073741824|Runtime
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|2|Runtime
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors or slow performance from spilling. If not set a value will be calculated using the GPUs memory and spark.rapids.sql.batchSizeBytes|None|Runtime
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true|Runtime
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NOT_ON_GPU|Runtime
<a name="sql.metrics.level"></a>spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE|Runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ object GpuDeviceManager extends Logging {
*/
def getNumCores: Int = numCores

private var memorySize = 0L

def getMemorySize: Long = memorySize

// Memory resource used only for cudf::chunked_pack to allocate scratch space
// during spill to host. This is done to set aside some memory for this operation
// from the beginning of the job.
Expand Down Expand Up @@ -322,6 +326,7 @@ object GpuDeviceManager extends Logging {

val info = Cuda.memGetInfo()
val poolAllocation = computeRmmPoolSize(conf, info)
memorySize = poolAllocation
var init = RmmAllocationMode.CUDA_DEFAULT
val features = ArrayBuffer[String]()
if (conf.isPooledMemEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuTaskMetrics

Expand Down Expand Up @@ -138,8 +139,15 @@ object GpuSemaphore {
def computeNumPermits(conf: SQLConf): Int = {
val concurrentStr = conf.getConfString(RapidsConf.CONCURRENT_GPU_TASKS.key, null)
val concurrentInt = Option(concurrentStr)
.map(ConfHelper.toInteger(_, RapidsConf.CONCURRENT_GPU_TASKS.key))
.getOrElse(RapidsConf.CONCURRENT_GPU_TASKS.defaultValue)
.map(ConfHelper.toInteger(_, RapidsConf.CONCURRENT_GPU_TASKS.key).toInt)
.getOrElse {
val memory = GpuDeviceManager.getMemorySize
val batchStr = conf.getConfString(RapidsConf.GPU_BATCH_SIZE_BYTES.key, null)
val batchBytes = Option(batchStr)
.map(ConfHelper.byteFromString(_, ByteUnit.BYTE))
.getOrElse(RapidsConf.GPU_BATCH_SIZE_BYTES.defaultValue)
math.max(1, math.min(4, memory / (4 * batchBytes))).toInt
Copy link
Collaborator

@winningsix winningsix Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep consistent with auto-tooler (4 -> 8) or just remove the suggestions logic there since it's calculated by batch size and GPU memory already?

In auto-tooler, it uses gpuMemPerTaskMB (used 7500 MB. ~7.5GB per task) and total memory to calculate the concurrent GPU task config.

}
// concurrentInt <= 0 is the same as 1 (fail to the safest value)
// concurrentInt > MAX_PERMITS becomes the same as MAX_PERMITS
// (who has more than 1000 threads anyways).
Expand Down
21 changes: 11 additions & 10 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -542,15 +542,6 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.stringConf
.createWithDefault("ASYNC")

val CONCURRENT_GPU_TASKS = conf("spark.rapids.sql.concurrentGpuTasks")
.doc("Set the number of tasks that can execute concurrently per GPU. " +
"Tasks may temporarily block when the number of concurrent tasks in the executor " +
"exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to " +
"GPU out of memory errors.")
.commonlyUsed()
.integerConf
.createWithDefault(2)

val GPU_BATCH_SIZE_BYTES = conf("spark.rapids.sql.batchSizeBytes")
.doc("Set the target number of bytes for a GPU batch. Splits sizes for input data " +
"is covered by separate configs.")
Expand All @@ -559,6 +550,16 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValue(v => v > 0, "Batch size must be positive")
.createWithDefault(1 * 1024 * 1024 * 1024) // 1 GiB is the default

val CONCURRENT_GPU_TASKS = conf("spark.rapids.sql.concurrentGpuTasks")
.doc("Set the number of tasks that can execute concurrently per GPU. " +
"Tasks may temporarily block when the number of concurrent tasks in the executor " +
"exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to " +
"GPU out of memory errors or slow performance from spilling. If not set " +
s"a value will be calculated using the GPUs memory and $GPU_BATCH_SIZE_BYTES")
.commonlyUsed()
.integerConf
.createOptional

val CHUNKED_READER = conf("spark.rapids.sql.reader.chunked")
.doc("Enable a chunked reader where possible. A chunked reader allows " +
"reading highly compressed data that could not be read otherwise, but at the expense " +
Expand Down Expand Up @@ -2714,7 +2715,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val perTaskOverhead: Long = get(TASK_OVERHEAD_SIZE)

lazy val concurrentGpuTasks: Int = get(CONCURRENT_GPU_TASKS)
lazy val concurrentGpuTasks: Option[Integer] = get(CONCURRENT_GPU_TASKS)

lazy val isTestEnabled: Boolean = get(TEST_CONF)

Expand Down