Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch-25.02 to main [skip ci] #12219

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/mvn-verify-check/get-deps-sha1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ scala_ver=${1:-"2.12"}
base_URL="https://oss.sonatype.org/service/local/artifact/maven/resolve"
project_jni="spark-rapids-jni"
project_private="rapids-4-spark-private_${scala_ver}"
project_hybrid="rapids-4-spark-hybrid_${scala_ver}"

jni_ver=$(mvn help:evaluate -q -pl dist -Dexpression=spark-rapids-jni.version -DforceStdout)
private_ver=$(mvn help:evaluate -q -pl dist -Dexpression=spark-rapids-private.version -DforceStdout)
hybrid_ver=$(mvn help:evaluate -q -pl dist -Dexpression=spark-rapids-hybrid.version -DforceStdout)

if [[ $jni_ver == *SNAPSHOT* ]]; then
jni_sha1=$(curl -s -H "Accept: application/json" \
Expand All @@ -40,6 +42,14 @@ else
private_sha1=$private_ver
fi

sha1md5=$(echo -n "${jni_sha1}_${private_sha1}" | md5sum | awk '{print $1}')
if [[ $hybrid_ver == *SNAPSHOT* ]]; then
hybrid_sha1=$(curl -s -H "Accept: application/json" \
"${base_URL}?r=snapshots&g=com.nvidia&a=${project_hybrid}&v=${hybrid_ver}&c=&e=jar&wt=json" \
| jq .data.sha1) || $(date +'%Y-%m-%d')
else
hybrid_sha1=$hybrid_ver
fi

sha1md5=$(echo -n "${jni_sha1}_${private_sha1}_${hybrid_sha1}" | md5sum | awk '{print $1}')

echo $sha1md5
4 changes: 2 additions & 2 deletions docs/dev/hybrid-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ the Rapids hybrid jar) in the classpath by specifying:
## Limitations
- Only supports V1 Parquet data source.
- Only supports Scala 2.12, do not support Scala 2.13.
- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 like [Gluten supports](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0),
other Spark versions 32x, 33x, 34x, 35x also work, but are not fully tested.
- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1, matching [Gluten](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0).
Other Spark versions 32x, 33x, 34x, 35x may work, but are not fully tested.
6 changes: 3 additions & 3 deletions docs/download.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ The output of signature verify:
gpg: Good signature from "NVIDIA Spark (For the signature of spark-rapids release jars) <sw-spark@nvidia.com>"

### Release Notes
* Support Spark function Bin
* Improve spark metrics: Print the batch size information to executor log
* Support the Spark functions Bin and TruncDate
* Support group-limit optimization for ROW_NUMBER
* Improve Spark metrics: Print the batch size information to executor log
* Refine filter push down to avoid double evaluation
* Grab the GPU Semaphore when reading cached batch data with the GPU to avoid a GPU OOM case
* Add an option to disable measuring buffer copy to improve large shuffle large partition serialization
* Support group-limit optimization for ROW_NUMBER
* For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases)

Note: There is a known issue in the 25.02.0 release when decompressing gzip files on H100 GPUs.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@
<spark.version.classifier>spark${buildver}</spark.version.classifier>
<cuda.version>cuda11</cuda.version>
<jni.classifier>${cuda.version}</jni.classifier>
<spark-rapids-jni.version>25.02.0</spark-rapids-jni.version>
<spark-rapids-jni.version>25.02.1-SNAPSHOT</spark-rapids-jni.version>
<spark-rapids-private.version>25.02.0</spark-rapids-private.version>
<spark-rapids-hybrid.version>25.02.0</spark-rapids-hybrid.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down
2 changes: 1 addition & 1 deletion scala2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@
<spark.version.classifier>spark${buildver}</spark.version.classifier>
<cuda.version>cuda11</cuda.version>
<jni.classifier>${cuda.version}</jni.classifier>
<spark-rapids-jni.version>25.02.0</spark-rapids-jni.version>
<spark-rapids-jni.version>25.02.1-SNAPSHOT</spark-rapids-jni.version>
<spark-rapids-private.version>25.02.0</spark-rapids-private.version>
<spark-rapids-hybrid.version>25.02.0</spark-rapids-hybrid.version>
<scala.binary.version>2.13</scala.binary.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2302,7 +2302,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
val CHUNKED_PACK_BOUNCE_BUFFER_COUNT = conf("spark.rapids.sql.chunkedPack.bounceBuffers")
.doc("Number of chunked pack bounce buffers, needed during spill from GPU to host memory. ")
.internal()
.longConf
.integerConf
.checkValue(v => v >= 1,
"The chunked pack bounce buffer count must be at least 1")
.createWithDefault(4)
Expand All @@ -2321,7 +2321,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
conf("spark.rapids.memory.host.spillToDiskBounceBuffers")
.doc("Number of bounce buffers used for gpu to disk spill that bypasses the host store.")
.internal()
.longConf
.integerConf
.checkValue(v => v >= 1,
"The gpu to disk spill bounce buffer count must be positive")
.createWithDefault(4)
Expand Down Expand Up @@ -3273,11 +3273,11 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val chunkedPackBounceBufferSize: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_SIZE)

lazy val chunkedPackBounceBufferCount: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_COUNT)
lazy val chunkedPackBounceBufferCount: Int = get(CHUNKED_PACK_BOUNCE_BUFFER_COUNT)

lazy val spillToDiskBounceBufferSize: Long = get(SPILL_TO_DISK_BOUNCE_BUFFER_SIZE)

lazy val spillToDiskBounceBufferCount: Long = get(SPILL_TO_DISK_BOUNCE_BUFFER_COUNT)
lazy val spillToDiskBounceBufferCount: Int = get(SPILL_TO_DISK_BOUNCE_BUFFER_COUNT)

lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel}
import java.nio.file.StandardOpenOption
import java.util
import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}

import scala.collection.mutable

Expand Down Expand Up @@ -1805,21 +1805,28 @@ private[spill] class BounceBuffer[T <: AutoCloseable](
* Callers should synchronize before calling close on their `DeviceMemoryBuffer`s.
*/
class BounceBufferPool[T <: AutoCloseable](private val bufSize: Long,
private val bbCount: Long,
private val bbCount: Int,
private val allocator: Long => T)
extends AutoCloseable with Logging {

private val pool = new LinkedBlockingQueue[BounceBuffer[T]]
for (_ <- 1L to bbCount) {
private val pool = new ArrayBlockingQueue[BounceBuffer[T]](bbCount)
for (_ <- 1 to bbCount) {
pool.offer(new BounceBuffer[T](allocator(bufSize), this))
}

def bufferSize: Long = bufSize
def nextBuffer(): BounceBuffer[T] = synchronized {
if (closed) {
logError("tried to acquire a bounce buffer after the" +
throw new IllegalStateException("tried to acquire a bounce buffer after the" +
"pool has been closed!")
}
while (pool.size() <= 0) {
wait()
if (closed) {
throw new IllegalStateException("tried to acquire a bounce buffer after the" +
"pool has been closed!")
}
}
pool.take()
}

Expand All @@ -1828,6 +1835,8 @@ class BounceBufferPool[T <: AutoCloseable](private val bufSize: Long,
buffer.release()
} else {
pool.offer(buffer)
// Wake up one thread to take the next bounce buffer
notify()
}
}

Expand All @@ -1842,6 +1851,8 @@ class BounceBufferPool[T <: AutoCloseable](private val bufSize: Long,

pool.forEach(_.release())
pool.clear()
// Wake up any threads that might be waiting still...
notifyAll()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.jni.{RmmSpark, RmmSparkThreadState}
import com.nvidia.spark.rapids.spill._
import org.mockito.Mockito.when
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Ignore}
import org.scalatest.concurrent.{Signaler, TimeLimits}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.time._
Expand All @@ -34,6 +34,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil

// Waiting for the fix of https://github.com/NVIDIA/spark-rapids/issues/12194
@Ignore
class HostAllocSuite extends AnyFunSuite with BeforeAndAfterEach with
BeforeAndAfterAll with TimeLimits {
private val sqlConf = new SQLConf()
Expand Down