diff --git a/.github/workflows/mvn-verify-check/get-deps-sha1.sh b/.github/workflows/mvn-verify-check/get-deps-sha1.sh index 2e3a3746edd..1fb8b197ac0 100755 --- a/.github/workflows/mvn-verify-check/get-deps-sha1.sh +++ b/.github/workflows/mvn-verify-check/get-deps-sha1.sh @@ -20,9 +20,11 @@ scala_ver=${1:-"2.12"} base_URL="https://oss.sonatype.org/service/local/artifact/maven/resolve" project_jni="spark-rapids-jni" project_private="rapids-4-spark-private_${scala_ver}" +project_hybrid="rapids-4-spark-hybrid_${scala_ver}" jni_ver=$(mvn help:evaluate -q -pl dist -Dexpression=spark-rapids-jni.version -DforceStdout) private_ver=$(mvn help:evaluate -q -pl dist -Dexpression=spark-rapids-private.version -DforceStdout) +hybrid_ver=$(mvn help:evaluate -q -pl dist -Dexpression=spark-rapids-hybrid.version -DforceStdout) if [[ $jni_ver == *SNAPSHOT* ]]; then jni_sha1=$(curl -s -H "Accept: application/json" \ @@ -40,6 +42,14 @@ else private_sha1=$private_ver fi -sha1md5=$(echo -n "${jni_sha1}_${private_sha1}" | md5sum | awk '{print $1}') +if [[ $hybrid_ver == *SNAPSHOT* ]]; then + hybrid_sha1=$(curl -s -H "Accept: application/json" \ + "${base_URL}?r=snapshots&g=com.nvidia&a=${project_hybrid}&v=${hybrid_ver}&c=&e=jar&wt=json" \ + | jq .data.sha1) || $(date +'%Y-%m-%d') +else + hybrid_sha1=$hybrid_ver +fi + +sha1md5=$(echo -n "${jni_sha1}_${private_sha1}_${hybrid_sha1}" | md5sum | awk '{print $1}') echo $sha1md5 diff --git a/docs/dev/hybrid-execution.md b/docs/dev/hybrid-execution.md index 7e561baf6c9..eb0949f95ee 100644 --- a/docs/dev/hybrid-execution.md +++ b/docs/dev/hybrid-execution.md @@ -53,5 +53,5 @@ the Rapids hybrid jar) in the classpath by specifying: ## Limitations - Only supports V1 Parquet data source. - Only supports Scala 2.12, do not support Scala 2.13. -- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 like [Gluten supports](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0), -other Spark versions 32x, 33x, 34x, 35x also work, but are not fully tested. +- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1, matching [Gluten](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0). +Other Spark versions 32x, 33x, 34x, 35x may work, but are not fully tested. diff --git a/docs/download.md b/docs/download.md index 2fd6ae750c9..0369bc5b3d0 100644 --- a/docs/download.md +++ b/docs/download.md @@ -94,12 +94,12 @@ The output of signature verify: gpg: Good signature from "NVIDIA Spark (For the signature of spark-rapids release jars) " ### Release Notes -* Support Spark function Bin -* Improve spark metrics: Print the batch size information to executor log +* Support the Spark functions Bin and TruncDate +* Support group-limit optimization for ROW_NUMBER +* Improve Spark metrics: Print the batch size information to executor log * Refine filter push down to avoid double evaluation * Grab the GPU Semaphore when reading cached batch data with the GPU to avoid a GPU OOM case * Add an option to disable measuring buffer copy to improve large shuffle large partition serialization -* Support group-limit optimization for ROW_NUMBER * For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases) Note: There is a known issue in the 25.02.0 release when decompressing gzip files on H100 GPUs. diff --git a/pom.xml b/pom.xml index f5a6cc1cc70..51328764e75 100644 --- a/pom.xml +++ b/pom.xml @@ -840,7 +840,7 @@ spark${buildver} cuda11 ${cuda.version} - 25.02.0 + 25.02.1-SNAPSHOT 25.02.0 25.02.0 2.12 diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index 88e940eebfd..658137ecb77 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -840,7 +840,7 @@ spark${buildver} cuda11 ${cuda.version} - 25.02.0 + 25.02.1-SNAPSHOT 25.02.0 25.02.0 2.13 diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 304f26d6652..de9e92415cc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2302,7 +2302,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. val CHUNKED_PACK_BOUNCE_BUFFER_COUNT = conf("spark.rapids.sql.chunkedPack.bounceBuffers") .doc("Number of chunked pack bounce buffers, needed during spill from GPU to host memory. ") .internal() - .longConf + .integerConf .checkValue(v => v >= 1, "The chunked pack bounce buffer count must be at least 1") .createWithDefault(4) @@ -2321,7 +2321,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. conf("spark.rapids.memory.host.spillToDiskBounceBuffers") .doc("Number of bounce buffers used for gpu to disk spill that bypasses the host store.") .internal() - .longConf + .integerConf .checkValue(v => v >= 1, "The gpu to disk spill bounce buffer count must be positive") .createWithDefault(4) @@ -3273,11 +3273,11 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val chunkedPackBounceBufferSize: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_SIZE) - lazy val chunkedPackBounceBufferCount: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_COUNT) + lazy val chunkedPackBounceBufferCount: Int = get(CHUNKED_PACK_BOUNCE_BUFFER_COUNT) lazy val spillToDiskBounceBufferSize: Long = get(SPILL_TO_DISK_BOUNCE_BUFFER_SIZE) - lazy val spillToDiskBounceBufferCount: Long = get(SPILL_TO_DISK_BOUNCE_BUFFER_COUNT) + lazy val spillToDiskBounceBufferCount: Int = get(SPILL_TO_DISK_BOUNCE_BUFFER_COUNT) lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala index 604e07fe0ff..fa3d5406d59 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala @@ -22,7 +22,7 @@ import java.nio.channels.{Channels, FileChannel, WritableByteChannel} import java.nio.file.StandardOpenOption import java.util import java.util.UUID -import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} import scala.collection.mutable @@ -1805,21 +1805,28 @@ private[spill] class BounceBuffer[T <: AutoCloseable]( * Callers should synchronize before calling close on their `DeviceMemoryBuffer`s. */ class BounceBufferPool[T <: AutoCloseable](private val bufSize: Long, - private val bbCount: Long, + private val bbCount: Int, private val allocator: Long => T) extends AutoCloseable with Logging { - private val pool = new LinkedBlockingQueue[BounceBuffer[T]] - for (_ <- 1L to bbCount) { + private val pool = new ArrayBlockingQueue[BounceBuffer[T]](bbCount) + for (_ <- 1 to bbCount) { pool.offer(new BounceBuffer[T](allocator(bufSize), this)) } def bufferSize: Long = bufSize def nextBuffer(): BounceBuffer[T] = synchronized { if (closed) { - logError("tried to acquire a bounce buffer after the" + + throw new IllegalStateException("tried to acquire a bounce buffer after the" + "pool has been closed!") } + while (pool.size() <= 0) { + wait() + if (closed) { + throw new IllegalStateException("tried to acquire a bounce buffer after the" + + "pool has been closed!") + } + } pool.take() } @@ -1828,6 +1835,8 @@ class BounceBufferPool[T <: AutoCloseable](private val bufSize: Long, buffer.release() } else { pool.offer(buffer) + // Wake up one thread to take the next bounce buffer + notify() } } @@ -1842,6 +1851,8 @@ class BounceBufferPool[T <: AutoCloseable](private val bufSize: Long, pool.forEach(_.release()) pool.clear() + // Wake up any threads that might be waiting still... + notifyAll() } } } diff --git a/sql-plugin/src/test/scala/com/nvidia/spark/rapids/HostAllocSuite.scala b/sql-plugin/src/test/scala/com/nvidia/spark/rapids/HostAllocSuite.scala index fbd61b9a7fb..faec9c4477f 100644 --- a/sql-plugin/src/test/scala/com/nvidia/spark/rapids/HostAllocSuite.scala +++ b/sql-plugin/src/test/scala/com/nvidia/spark/rapids/HostAllocSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.{RmmSpark, RmmSparkThreadState} import com.nvidia.spark.rapids.spill._ import org.mockito.Mockito.when -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Ignore} import org.scalatest.concurrent.{Signaler, TimeLimits} import org.scalatest.funsuite.AnyFunSuite import org.scalatest.time._ @@ -34,6 +34,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil +// Waiting for the fix of https://github.com/NVIDIA/spark-rapids/issues/12194 +@Ignore class HostAllocSuite extends AnyFunSuite with BeforeAndAfterEach with BeforeAndAfterAll with TimeLimits { private val sqlConf = new SQLConf()