Skip to content

Commit

Permalink
Have host spill use the new HostAlloc API (#9257)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 21, 2023
1 parent d6c7db2 commit 2a8518e
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private class HostAlloc(nonPinnedLimit: Long) {
private def canNeverSucceed(amount: Long, preferPinned: Boolean): Boolean = {
val pinnedFailed = (isPinnedOnly || preferPinned) && (amount > pinnedLimit)
val nonPinnedFailed = isPinnedOnly || (amount > nonPinnedLimit)
pinnedFailed && nonPinnedFailed
!isUnlimited && pinnedFailed && nonPinnedFailed
}

private def checkSize(amount: Long, preferPinned: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,11 @@ class RapidsBufferCopyIterator(buffer: RapidsBuffer)
}

override def close(): Unit = {
val hasNextBeforeClose = hasNext
val toClose = new ArrayBuffer[AutoCloseable]()
toClose.appendAll(chunkedPacker)
toClose.appendAll(Option(singleShotBuffer))

toClose.safeClose()
require(!hasNextBeforeClose,
"RapidsBufferCopyIterator was closed before exhausting")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,14 @@ object RapidsBufferCatalog extends Logging {
rapidsConf.chunkedPackBounceBufferSize,
rapidsConf.spillToDiskBounceBufferSize)
diskBlockManager = new RapidsDiskBlockManager(conf)
val hostSpillStorageSize = if (rapidsConf.hostSpillStorageSize == -1) {
val hostSpillStorageSize = if (rapidsConf.offHeapLimitEnabled) {
// Disable the limit because it is handled by the RapidsHostMemoryStore
None
} else if (rapidsConf.hostSpillStorageSize == -1) {
// + 1 GiB by default to match backwards compatibility
rapidsConf.pinnedPoolSize + (1024 * 1024 * 1024)
Some(rapidsConf.pinnedPoolSize + (1024 * 1024 * 1024))
} else {
rapidsConf.hostSpillStorageSize
Some(rapidsConf.hostSpillStorageSize)
}
hostStorage = new RapidsHostMemoryStore(hostSpillStorageSize)
diskStorage = new RapidsDiskStore(diskBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.channels.FileChannel.MapMode
import java.util.concurrent.ConcurrentHashMap

import ai.rapids.cudf.{Cuda, HostMemoryBuffer, MemoryBuffer}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

Expand Down Expand Up @@ -192,7 +192,7 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
val path = id.getDiskPath(diskBlockManager)
withResource(new FileInputStream(path)) { fis =>
val (header, hostBuffer) = SerializedHostTableUtils.readTableHeaderAndBuffer(fis)
val hostCols = closeOnExcept(hostBuffer) { _ =>
val hostCols = withResource(hostBuffer) { _ =>
SerializedHostTableUtils.buildHostColumns(header, hostBuffer, sparkTypes)
}
new ColumnarBatch(hostCols.toArray, header.getNumRows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer, NvtxColor, NvtxRange, PinnedMemoryPool}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, freeOnExcept, withResource}
import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_SPILL_OFFSET}
import com.nvidia.spark.rapids.StorageTier.StorageTier
Expand All @@ -36,29 +36,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
/**
* A buffer store using host memory.
* @param maxSize maximum size in bytes for all buffers in this store
* @param pageableMemoryPoolSize maximum size in bytes for the internal pageable memory pool
*/
class RapidsHostMemoryStore(
maxSize: Long)
maxSize: Option[Long])
extends RapidsBufferStore(StorageTier.HOST) {

override protected def spillableOnAdd: Boolean = false

override def getMaxSize: Option[Long] = Some(maxSize)

private def allocateHostBuffer(
size: Long,
preferPinned: Boolean = true): HostMemoryBuffer = {
var buffer: HostMemoryBuffer = null
if (preferPinned) {
buffer = PinnedMemoryPool.tryAllocate(size)
if (buffer != null) {
return buffer
}
}

HostMemoryBuffer.allocate(size, false)
}
override def getMaxSize: Option[Long] = maxSize

def addBuffer(
id: RapidsBufferId,
Expand Down Expand Up @@ -102,21 +87,23 @@ class RapidsHostMemoryStore(
buffer: RapidsBuffer,
catalog: RapidsBufferCatalog,
stream: Cuda.Stream): Boolean = {
// this spillStore has a maximum size requirement (host only). We need to spill from it
// in order to make room for `buffer`.
val targetTotalSize = maxSize - buffer.memoryUsedBytes
if (targetTotalSize <= 0) {
// lets not spill to host when the buffer we are about
// to spill is larger than our limit
false
} else {
val amountSpilled = synchronousSpill(targetTotalSize, catalog, stream)
if (amountSpilled != 0) {
logDebug(s"Spilled $amountSpilled bytes from ${name} to make room for ${buffer.id}")
TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled)
maxSize.forall { ms =>
// this spillStore has a maximum size requirement (host only). We need to spill from it
// in order to make room for `buffer`.
val targetTotalSize = ms - buffer.memoryUsedBytes
if (targetTotalSize < 0) {
// lets not spill to host when the buffer we are about
// to spill is larger than our limit
false
} else {
val amountSpilled = synchronousSpill(targetTotalSize, catalog, stream)
if (amountSpilled != 0) {
logDebug(s"Spilled $amountSpilled bytes from ${name} to make room for ${buffer.id}")
TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled)
}
// if after spill we can fit the new buffer, return true
buffer.memoryUsedBytes <= (ms - currentSize)
}
// if after spill we can fit the new buffer, return true
buffer.memoryUsedBytes <= (maxSize - currentSize)
}
}

Expand All @@ -125,53 +112,58 @@ class RapidsHostMemoryStore(
catalog: RapidsBufferCatalog,
stream: Cuda.Stream): Option[RapidsBufferBase] = {
val wouldFit = trySpillToMaximumSize(other, catalog, stream)
// TODO: this is disabled for now since subsequent work will tie this into
// our host allocator apis.
if (false && !wouldFit) {
if (!wouldFit) {
// skip host
logWarning(s"Buffer ${other} with size ${other.memoryUsedBytes} does not fit " +
logWarning(s"Buffer $other with size ${other.memoryUsedBytes} does not fit " +
s"in the host store, skipping tier.")
None
} else {
withResource(other.getCopyIterator) { otherBufferIterator =>
val isChunked = otherBufferIterator.isChunked
val totalCopySize = otherBufferIterator.getTotalCopySize
closeOnExcept(allocateHostBuffer(totalCopySize)) { hostBuffer =>
withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ =>
var hostOffset = 0L
val start = System.nanoTime()
while (otherBufferIterator.hasNext) {
val otherBuffer = otherBufferIterator.next()
withResource(otherBuffer) { _ =>
otherBuffer match {
case devBuffer: DeviceMemoryBuffer =>
hostBuffer.copyFromMemoryBufferAsync(
hostOffset, devBuffer, 0, otherBuffer.getLength, stream)
hostOffset += otherBuffer.getLength
case _ =>
throw new IllegalStateException("copying from buffer without device memory")
closeOnExcept(HostAlloc.allocHighPriority(totalCopySize)) { hb =>
hb.map { hostBuffer =>
withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ =>
var hostOffset = 0L
val start = System.nanoTime()
while (otherBufferIterator.hasNext) {
val otherBuffer = otherBufferIterator.next()
withResource(otherBuffer) { _ =>
otherBuffer match {
case devBuffer: DeviceMemoryBuffer =>
hostBuffer.copyFromMemoryBufferAsync(
hostOffset, devBuffer, 0, otherBuffer.getLength, stream)
hostOffset += otherBuffer.getLength
case _ =>
throw new IllegalStateException("copying from buffer without device memory")
}
}
}
stream.sync()
val end = System.nanoTime()
val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong
val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong
logDebug(s"Spill to host (chunked=$isChunked) " +
s"size=$szMB MiB bandwidth=$bw MiB/sec")
}
stream.sync()
val end = System.nanoTime()
val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong
val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong
logDebug(s"Spill to host (chunked=$isChunked) " +
s"size=$szMB MiB bandwidth=$bw MiB/sec")
new RapidsHostMemoryBuffer(
other.id,
totalCopySize,
other.meta,
applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET),
hostBuffer)
}.orElse {
// skip host
logWarning(s"Buffer $other with size ${other.memoryUsedBytes} does not fit " +
s"in the host store, skipping tier.")
None
}
Some(new RapidsHostMemoryBuffer(
other.id,
totalCopySize,
other.meta,
applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET),
hostBuffer))
}
}
}
}

def numBytesFree: Long = maxSize - currentSize
def numBytesFree: Option[Long] = maxSize.map(_ - currentSize)

class RapidsHostMemoryBuffer(
id: RapidsBufferId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class RapidsBufferCatalogSuite extends AnyFunSuite with MockitoSugar {
withResource(spy(new RapidsDeviceMemoryStore)) { deviceStore =>
val mockStore = mock[RapidsBufferStore]
withResource(
new RapidsHostMemoryStore(10000)) { hostStore =>
new RapidsHostMemoryStore(Some(10000))) { hostStore =>
deviceStore.setSpillStore(hostStore)
hostStore.setSpillStore(mockStore)
val catalog = new RapidsBufferCatalog(deviceStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = spy(new RapidsBufferCatalog(devStore))
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
Expand Down Expand Up @@ -102,7 +102,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) {
Expand Down Expand Up @@ -144,7 +144,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
Expand Down Expand Up @@ -288,7 +288,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) { hostStore =>
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
hostStore.setSpillStore(diskStore)
Expand Down Expand Up @@ -340,7 +340,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
testBufferFileDeletion(canShareDiskPaths = true)
}

class AlwaysFailingRapidsHostMemoryStore extends RapidsHostMemoryStore(0L){
class AlwaysFailingRapidsHostMemoryStore extends RapidsHostMemoryStore(Some(0L)){
override def createBuffer(
other: RapidsBuffer,
catalog: RapidsBufferCatalog,
Expand All @@ -357,7 +357,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
Expand Down
Loading

0 comments on commit 2a8518e

Please sign in to comment.