Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have host spill use the new HostAlloc API #9257

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private class HostAlloc(nonPinnedLimit: Long) {
private def canNeverSucceed(amount: Long, preferPinned: Boolean): Boolean = {
val pinnedFailed = (isPinnedOnly || preferPinned) && (amount > pinnedLimit)
val nonPinnedFailed = isPinnedOnly || (amount > nonPinnedLimit)
pinnedFailed && nonPinnedFailed
!isUnlimited && pinnedFailed && nonPinnedFailed
}

private def checkSize(amount: Long, preferPinned: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,11 @@ class RapidsBufferCopyIterator(buffer: RapidsBuffer)
}

override def close(): Unit = {
val hasNextBeforeClose = hasNext
val toClose = new ArrayBuffer[AutoCloseable]()
toClose.appendAll(chunkedPacker)
toClose.appendAll(Option(singleShotBuffer))

toClose.safeClose()
require(!hasNextBeforeClose,
"RapidsBufferCopyIterator was closed before exhausting")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,11 +780,14 @@ object RapidsBufferCatalog extends Logging {
gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize)
deviceStorage.setSpillStore(gdsStorage)
} else {
val hostSpillStorageSize = if (rapidsConf.hostSpillStorageSize == -1) {
val hostSpillStorageSize = if (rapidsConf.offHeapLimitEnabled) {
// Disable the limit because it is handled by the RapidsHostMemoryStore
None
} else if (rapidsConf.hostSpillStorageSize == -1) {
// + 1 GiB by default to match backwards compatibility
rapidsConf.pinnedPoolSize + (1024 * 1024 * 1024)
Some(rapidsConf.pinnedPoolSize + (1024 * 1024 * 1024))
} else {
rapidsConf.hostSpillStorageSize
Some(rapidsConf.hostSpillStorageSize)
}
hostStorage = new RapidsHostMemoryStore(hostSpillStorageSize)
diskStorage = new RapidsDiskStore(diskBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.channels.FileChannel.MapMode
import java.util.concurrent.ConcurrentHashMap

import ai.rapids.cudf.{Cuda, HostMemoryBuffer, MemoryBuffer}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

Expand Down Expand Up @@ -192,7 +192,7 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
val path = id.getDiskPath(diskBlockManager)
withResource(new FileInputStream(path)) { fis =>
val (header, hostBuffer) = SerializedHostTableUtils.readTableHeaderAndBuffer(fis)
val hostCols = closeOnExcept(hostBuffer) { _ =>
val hostCols = withResource(hostBuffer) { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

SerializedHostTableUtils.buildHostColumns(header, hostBuffer, sparkTypes)
}
new ColumnarBatch(hostCols.toArray, header.getNumRows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer, NvtxColor, NvtxRange, PinnedMemoryPool}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, freeOnExcept, withResource}
import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_SPILL_OFFSET}
import com.nvidia.spark.rapids.StorageTier.StorageTier
Expand All @@ -39,26 +39,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* @param pageableMemoryPoolSize maximum size in bytes for the internal pageable memory pool
*/
class RapidsHostMemoryStore(
maxSize: Long)
maxSize: Option[Long])
extends RapidsBufferStore(StorageTier.HOST) {

override protected def spillableOnAdd: Boolean = false

override def getMaxSize: Option[Long] = Some(maxSize)

private def allocateHostBuffer(
size: Long,
preferPinned: Boolean = true): HostMemoryBuffer = {
var buffer: HostMemoryBuffer = null
if (preferPinned) {
buffer = PinnedMemoryPool.tryAllocate(size)
if (buffer != null) {
return buffer
}
}

HostMemoryBuffer.allocate(size, false)
}
override def getMaxSize: Option[Long] = maxSize

def addBuffer(
id: RapidsBufferId,
Expand Down Expand Up @@ -102,21 +88,23 @@ class RapidsHostMemoryStore(
buffer: RapidsBuffer,
catalog: RapidsBufferCatalog,
stream: Cuda.Stream): Boolean = {
// this spillStore has a maximum size requirement (host only). We need to spill from it
// in order to make room for `buffer`.
val targetTotalSize = maxSize - buffer.memoryUsedBytes
if (targetTotalSize <= 0) {
// lets not spill to host when the buffer we are about
// to spill is larger than our limit
false
} else {
val amountSpilled = synchronousSpill(targetTotalSize, catalog, stream)
if (amountSpilled != 0) {
logDebug(s"Spilled $amountSpilled bytes from ${name} to make room for ${buffer.id}")
TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled)
maxSize.forall { ms =>
// this spillStore has a maximum size requirement (host only). We need to spill from it
// in order to make room for `buffer`.
val targetTotalSize = ms - buffer.memoryUsedBytes
if (targetTotalSize <= 0) {
// lets not spill to host when the buffer we are about
// to spill is larger than our limit
false
} else {
val amountSpilled = synchronousSpill(targetTotalSize, catalog, stream)
if (amountSpilled != 0) {
logDebug(s"Spilled $amountSpilled bytes from ${name} to make room for ${buffer.id}")
TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled)
}
// if after spill we can fit the new buffer, return true
buffer.memoryUsedBytes <= (ms - currentSize)
}
// if after spill we can fit the new buffer, return true
buffer.memoryUsedBytes <= (maxSize - currentSize)
}
}

Expand All @@ -125,53 +113,58 @@ class RapidsHostMemoryStore(
catalog: RapidsBufferCatalog,
stream: Cuda.Stream): Option[RapidsBufferBase] = {
val wouldFit = trySpillToMaximumSize(other, catalog, stream)
// TODO: this is disabled for now since subsequent work will tie this into
// our host allocator apis.
if (false && !wouldFit) {
if (!wouldFit) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 this will change the behavior in 23.10. Before this, we would still to host. Now we are going to skip to disk if we can't fit given the limit. Just making sure that was intended.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that is what we want, but if anyone has a different opinion I am all ears. We want to get to the point where we have a hard limit on host memory. The reason we don't want to make the changes piecemeal is to reduce the pain a customer might see when running a job. This change is only going to show up when a customer wants to spill from GPU memory, and it is larger than the spill store size. I think that is fairly rare, so I am willing to take the hit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default the host store is configured to 1GB + pinnedPoolSize, and pinnedPoolSize is defaulted to 0. I think if we raised it to 2GB I'd agree, otherwise it seems fairly common to bump up batchSizeBytes to 2GB and those would spill to disk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, with such a small spill store, we are bound to spill to disk very often anyway... so I don't know if we are saving too much.

// skip host
logWarning(s"Buffer ${other} with size ${other.memoryUsedBytes} does not fit " +
logWarning(s"Buffer $other with size ${other.memoryUsedBytes} does not fit " +
s"in the host store, skipping tier.")
None
} else {
withResource(other.getCopyIterator) { otherBufferIterator =>
val isChunked = otherBufferIterator.isChunked
val totalCopySize = otherBufferIterator.getTotalCopySize
closeOnExcept(allocateHostBuffer(totalCopySize)) { hostBuffer =>
withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ =>
var hostOffset = 0L
val start = System.nanoTime()
while (otherBufferIterator.hasNext) {
val otherBuffer = otherBufferIterator.next()
withResource(otherBuffer) { _ =>
otherBuffer match {
case devBuffer: DeviceMemoryBuffer =>
hostBuffer.copyFromMemoryBufferAsync(
hostOffset, devBuffer, 0, otherBuffer.getLength, stream)
hostOffset += otherBuffer.getLength
case _ =>
throw new IllegalStateException("copying from buffer without device memory")
closeOnExcept(HostAlloc.allocHighPriority(totalCopySize)) { hb =>
hb.map { hostBuffer =>
withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ =>
var hostOffset = 0L
val start = System.nanoTime()
while (otherBufferIterator.hasNext) {
val otherBuffer = otherBufferIterator.next()
withResource(otherBuffer) { _ =>
otherBuffer match {
case devBuffer: DeviceMemoryBuffer =>
hostBuffer.copyFromMemoryBufferAsync(
hostOffset, devBuffer, 0, otherBuffer.getLength, stream)
hostOffset += otherBuffer.getLength
case _ =>
throw new IllegalStateException("copying from buffer without device memory")
}
}
}
stream.sync()
val end = System.nanoTime()
val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong
val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong
logDebug(s"Spill to host (chunked=$isChunked) " +
s"size=$szMB MiB bandwidth=$bw MiB/sec")
}
stream.sync()
val end = System.nanoTime()
val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong
val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong
logDebug(s"Spill to host (chunked=$isChunked) " +
s"size=$szMB MiB bandwidth=$bw MiB/sec")
new RapidsHostMemoryBuffer(
other.id,
totalCopySize,
other.meta,
applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET),
hostBuffer)
}.orElse {
// skip host
logWarning(s"Buffer $other with size ${other.memoryUsedBytes} does not fit " +
s"in the host store, skipping tier.")
None
}
Some(new RapidsHostMemoryBuffer(
other.id,
totalCopySize,
other.meta,
applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET),
hostBuffer))
}
}
}
}

def numBytesFree: Long = maxSize - currentSize
def numBytesFree: Option[Long] = maxSize.map(_ - currentSize)

class RapidsHostMemoryBuffer(
id: RapidsBufferId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class RapidsBufferCatalogSuite extends AnyFunSuite with MockitoSugar {
withResource(spy(new RapidsDeviceMemoryStore)) { deviceStore =>
val mockStore = mock[RapidsBufferStore]
withResource(
new RapidsHostMemoryStore(10000)) { hostStore =>
new RapidsHostMemoryStore(Some(10000))) { hostStore =>
deviceStore.setSpillStore(hostStore)
hostStore.setSpillStore(mockStore)
val catalog = new RapidsBufferCatalog(deviceStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = spy(new RapidsBufferCatalog(devStore))
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
Expand Down Expand Up @@ -102,7 +102,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) {
Expand Down Expand Up @@ -144,7 +144,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
Expand Down Expand Up @@ -288,7 +288,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) { hostStore =>
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
hostStore.setSpillStore(diskStore)
Expand Down Expand Up @@ -340,7 +340,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
testBufferFileDeletion(canShareDiskPaths = true)
}

class AlwaysFailingRapidsHostMemoryStore extends RapidsHostMemoryStore(0L){
class AlwaysFailingRapidsHostMemoryStore extends RapidsHostMemoryStore(Some(0L)){
override def createBuffer(
other: RapidsBuffer,
catalog: RapidsBufferCatalog,
Expand All @@ -357,7 +357,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
withResource(new RapidsDeviceMemoryStore) { devStore =>
val catalog = new RapidsBufferCatalog(devStore)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) {
withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) {
hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore =>
Expand Down
Loading