-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve host memory spill interfaces #10065
Changes from 2 commits
67dddae
05c2383
d9b4413
df46a5b
1493b30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -147,55 +147,61 @@ public ColumnarBatch next() { | |||||||||||||||||||||||||||||||||||||||||
// Update our estimate for number of rows with the final size used to allocate the buffers. | ||||||||||||||||||||||||||||||||||||||||||
numRowsEstimate = (int) bufsAndNumRows._2.targetSize(); | ||||||||||||||||||||||||||||||||||||||||||
long dataLength = calcDataLengthEstimate(numRowsEstimate); | ||||||||||||||||||||||||||||||||||||||||||
try ( | ||||||||||||||||||||||||||||||||||||||||||
SpillableHostBuffer sdb = bufsAndNumRows._1[0]; | ||||||||||||||||||||||||||||||||||||||||||
int used[]; | ||||||||||||||||||||||||||||||||||||||||||
try (SpillableHostBuffer sdb = bufsAndNumRows._1[0]; | ||||||||||||||||||||||||||||||||||||||||||
SpillableHostBuffer sob = bufsAndNumRows._1[1]; | ||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||
// Fill in buffer under write lock for host buffers | ||||||||||||||||||||||||||||||||||||||||||
batchAndRange = sdb.withHostBufferWriteLock( (dataBuffer) -> { | ||||||||||||||||||||||||||||||||||||||||||
return sob.withHostBufferWriteLock( (offsetsBuffer) -> { | ||||||||||||||||||||||||||||||||||||||||||
int[] used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); | ||||||||||||||||||||||||||||||||||||||||||
int dataOffset = used[0]; | ||||||||||||||||||||||||||||||||||||||||||
int currentRow = used[1]; | ||||||||||||||||||||||||||||||||||||||||||
// We don't want to loop forever trying to copy nothing | ||||||||||||||||||||||||||||||||||||||||||
assert (currentRow > 0); | ||||||||||||||||||||||||||||||||||||||||||
if (numInputRows != null) { | ||||||||||||||||||||||||||||||||||||||||||
numInputRows.add(currentRow); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
if (numOutputRows != null) { | ||||||||||||||||||||||||||||||||||||||||||
numOutputRows.add(currentRow); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
if (numOutputBatches != null) { | ||||||||||||||||||||||||||||||||||||||||||
numOutputBatches.add(1); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
// Now that we have filled the buffers with the data, we need to turn them into a | ||||||||||||||||||||||||||||||||||||||||||
// HostColumnVector and copy them to the device so the GPU can turn it into a Table. | ||||||||||||||||||||||||||||||||||||||||||
// To do this we first need to make a HostColumnCoreVector for the data, and then | ||||||||||||||||||||||||||||||||||||||||||
// put that into a HostColumnVector as its child. This the basics of building up | ||||||||||||||||||||||||||||||||||||||||||
// a column of lists of bytes in CUDF but it is typically hidden behind the higer level | ||||||||||||||||||||||||||||||||||||||||||
// APIs. | ||||||||||||||||||||||||||||||||||||||||||
dataBuffer.incRefCount(); | ||||||||||||||||||||||||||||||||||||||||||
offsetsBuffer.incRefCount(); | ||||||||||||||||||||||||||||||||||||||||||
try (HostColumnVectorCore dataCv = | ||||||||||||||||||||||||||||||||||||||||||
new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L), | ||||||||||||||||||||||||||||||||||||||||||
dataBuffer, null, null, new ArrayList<>()); | ||||||||||||||||||||||||||||||||||||||||||
HostColumnVector hostColumn = new HostColumnVector(DType.LIST, | ||||||||||||||||||||||||||||||||||||||||||
currentRow, Optional.of(0L), null, null, | ||||||||||||||||||||||||||||||||||||||||||
offsetsBuffer, Collections.singletonList(dataCv))) { | ||||||||||||||||||||||||||||||||||||||||||
HostMemoryBuffer[] hBufs = getHostBuffersWithRetry(sdb, sob); | ||||||||||||||||||||||||||||||||||||||||||
try(HostMemoryBuffer dataBuffer = hBufs[0]; | ||||||||||||||||||||||||||||||||||||||||||
HostMemoryBuffer offsetsBuffer = hBufs[1]; | ||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||
used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
hBufs = getHostBuffersWithRetry(sdb, sob); | ||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not certain there is a good reason to have a second block here. I did it this way as a test, because previously I was hitting problems with the with-write-lock followed by with-read-only blocks. But at this point I think I could just combine these two blocks. |
||||||||||||||||||||||||||||||||||||||||||
try ( | ||||||||||||||||||||||||||||||||||||||||||
HostMemoryBuffer dataBuffer = hBufs[0]; | ||||||||||||||||||||||||||||||||||||||||||
HostMemoryBuffer offsetsBuffer = hBufs[1]; | ||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||
int dataOffset = used[0]; | ||||||||||||||||||||||||||||||||||||||||||
int currentRow = used[1]; | ||||||||||||||||||||||||||||||||||||||||||
// We don't want to loop forever trying to copy nothing | ||||||||||||||||||||||||||||||||||||||||||
assert (currentRow > 0); | ||||||||||||||||||||||||||||||||||||||||||
if (numInputRows != null) { | ||||||||||||||||||||||||||||||||||||||||||
numInputRows.add(currentRow); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
if (numOutputRows != null) { | ||||||||||||||||||||||||||||||||||||||||||
numOutputRows.add(currentRow); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
if (numOutputBatches != null) { | ||||||||||||||||||||||||||||||||||||||||||
numOutputBatches.add(1); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
// Now that we have filled the buffers with the data, we need to turn them into a | ||||||||||||||||||||||||||||||||||||||||||
// HostColumnVector and copy them to the device so the GPU can turn it into a Table. | ||||||||||||||||||||||||||||||||||||||||||
// To do this we first need to make a HostColumnCoreVector for the data, and then | ||||||||||||||||||||||||||||||||||||||||||
// put that into a HostColumnVector as its child. This the basics of building up | ||||||||||||||||||||||||||||||||||||||||||
// a column of lists of bytes in CUDF but it is typically hidden behind the higer level | ||||||||||||||||||||||||||||||||||||||||||
// APIs. | ||||||||||||||||||||||||||||||||||||||||||
dataBuffer.incRefCount(); | ||||||||||||||||||||||||||||||||||||||||||
offsetsBuffer.incRefCount(); | ||||||||||||||||||||||||||||||||||||||||||
try (HostColumnVectorCore dataCv = | ||||||||||||||||||||||||||||||||||||||||||
new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L), | ||||||||||||||||||||||||||||||||||||||||||
dataBuffer, null, null, new ArrayList<>()); | ||||||||||||||||||||||||||||||||||||||||||
HostColumnVector hostColumn = new HostColumnVector(DType.LIST, | ||||||||||||||||||||||||||||||||||||||||||
currentRow, Optional.of(0L), null, null, | ||||||||||||||||||||||||||||||||||||||||||
offsetsBuffer, Collections.singletonList(dataCv))) { | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
long ct = System.nanoTime() - collectStart; | ||||||||||||||||||||||||||||||||||||||||||
streamTime.add(ct); | ||||||||||||||||||||||||||||||||||||||||||
long ct = System.nanoTime() - collectStart; | ||||||||||||||||||||||||||||||||||||||||||
streamTime.add(ct); | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
// Grab the semaphore because we are about to put data onto the GPU. | ||||||||||||||||||||||||||||||||||||||||||
GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get()); | ||||||||||||||||||||||||||||||||||||||||||
NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, | ||||||||||||||||||||||||||||||||||||||||||
Option.apply(opTime)); | ||||||||||||||||||||||||||||||||||||||||||
ColumnVector devColumn = | ||||||||||||||||||||||||||||||||||||||||||
RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice); | ||||||||||||||||||||||||||||||||||||||||||
return Tuple2.apply(makeSpillableBatch(devColumn), range); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
// Grab the semaphore because we are about to put data onto the GPU. | ||||||||||||||||||||||||||||||||||||||||||
GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get()); | ||||||||||||||||||||||||||||||||||||||||||
NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, | ||||||||||||||||||||||||||||||||||||||||||
Option.apply(opTime)); | ||||||||||||||||||||||||||||||||||||||||||
ColumnVector devColumn = | ||||||||||||||||||||||||||||||||||||||||||
RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice); | ||||||||||||||||||||||||||||||||||||||||||
batchAndRange = Tuple2.apply(makeSpillableBatch(devColumn), range); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
try (NvtxRange ignored = batchAndRange._2; | ||||||||||||||||||||||||||||||||||||||||||
Table tab = | ||||||||||||||||||||||||||||||||||||||||||
|
@@ -208,6 +214,23 @@ public ColumnarBatch next() { | |||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer sdb, SpillableHostBuffer sob) { | ||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
return RmmRapidsRetryIterator.withRetryNoSplit( () -> { | ||||||||||||||||||||||||||||||||||||||||||
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null }; | ||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||
hBufs[0] = sdb.getHostBuffer(); | ||||||||||||||||||||||||||||||||||||||||||
hBufs[1] = sob.getHostBuffer(); | ||||||||||||||||||||||||||||||||||||||||||
return hBufs; | ||||||||||||||||||||||||||||||||||||||||||
} finally { | ||||||||||||||||||||||||||||||||||||||||||
// If the second buffer is null, we must have thrown, so close the first one. | ||||||||||||||||||||||||||||||||||||||||||
if ((hBufs[1] == null) && (hBufs[0] != null)) { | ||||||||||||||||||||||||||||||||||||||||||
hBufs[0].close(); | ||||||||||||||||||||||||||||||||||||||||||
hBufs[0] = null; | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to test for the exception implicitly, would this also work?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @gerashegalov. I have simplified this by just doing a try-with-resources for both buffers and incrementing the refcounts in the body so they are retained if didn't throw. |
||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||
private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize> | ||||||||||||||||||||||||||||||||||||||||||
allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) { | ||||||||||||||||||||||||||||||||||||||||||
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null }; | ||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -647,6 +647,34 @@ class RapidsBufferCatalog( | |||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Copies `buffer` to the `hostStorage` store, registering a new `RapidsBuffer` in | ||||||
* the process | ||||||
* | ||||||
* @param buffer - buffer to copy | ||||||
* @param stream - Cuda.Stream to synchronize on | ||||||
* @return - The `RapidsBuffer` instance that was added to the host store. | ||||||
*/ | ||||||
def unspillBufferToHostStore( | ||||||
buffer: RapidsBuffer, | ||||||
stream: Cuda.Stream): RapidsBuffer = synchronized { | ||||||
// try to acquire the buffer, if it's already in the store | ||||||
// do not create a new one, else add a reference | ||||||
acquireBuffer(buffer.id, StorageTier.HOST) match { | ||||||
case Some(existingBuffer) => existingBuffer | ||||||
case None => | ||||||
val maybeNewBuffer = hostStorage.copyBuffer(buffer, this, stream) | ||||||
maybeNewBuffer.map { newBuffer => | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This buffer is initially created as spillable. But it changes to unspillable when the caller does a getHostMemoryBuffer on it. Not sure if I should be concerned about this brief window of spillability? |
||||||
logDebug(s"got new RapidsHostMemoryStore buffer ${newBuffer.id}") | ||||||
newBuffer.addReference() // add a reference since we are about to use it | ||||||
updateTiers(BufferSpill(buffer, Some(newBuffer))) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The debug log output from update tiers make this sound like a spill, where in fact this is an unspill. Perhaps I should modify updateTiers to compare the storageTiers and change the log appropriately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was created only with spill in mind, not unspill. Could
|
||||||
buffer.safeFree() | ||||||
newBuffer | ||||||
}.get // the GPU store has to return a buffer here for now, or throw OOM | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
} | ||||||
|
||||||
|
||||||
/** | ||||||
* Remove a buffer ID from the catalog at the specified storage tier. | ||||||
* @note public for testing | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,62 +140,47 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) | |
meta: TableMeta, | ||
spillPriority: Long) | ||
extends RapidsBufferBase(id, meta, spillPriority) { | ||
private[this] var hostBuffer: Option[HostMemoryBuffer] = None | ||
|
||
// FIXME: Need to be clean up. Tracked in https://github.com/NVIDIA/spark-rapids/issues/9496 | ||
override val memoryUsedBytes: Long = uncompressedSize | ||
|
||
override val storageTier: StorageTier = StorageTier.DISK | ||
|
||
override def getMemoryBuffer: MemoryBuffer = synchronized { | ||
if (hostBuffer.isEmpty) { | ||
require(onDiskSizeInBytes > 0, | ||
s"$this attempted an invalid 0-byte mmap of a file") | ||
val path = id.getDiskPath(diskBlockManager) | ||
val serializerManager = diskBlockManager.getSerializerManager() | ||
val memBuffer = if (serializerManager.isRapidsSpill(id)) { | ||
// Only go through serializerManager's stream wrapper for spill case | ||
closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed => | ||
GpuTaskMetrics.get.readSpillFromDiskTime { | ||
withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c => | ||
c.position(fileOffset) | ||
withResource(Channels.newInputStream(c)) { compressed => | ||
withResource(serializerManager.wrapStream(id, compressed)) { in => | ||
withResource(new HostMemoryOutputStream(decompressed)) { out => | ||
IOUtils.copy(in, out) | ||
} | ||
decompressed | ||
require(onDiskSizeInBytes > 0, | ||
s"$this attempted an invalid 0-byte mmap of a file") | ||
val path = id.getDiskPath(diskBlockManager) | ||
val serializerManager = diskBlockManager.getSerializerManager() | ||
val memBuffer = if (serializerManager.isRapidsSpill(id)) { | ||
// Only go through serializerManager's stream wrapper for spill case | ||
closeOnExcept(HostAlloc.alloc(uncompressedSize)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding this HostAlloc.alloc() is what really increases how much heap memory I can get away with when running queries. This also requires that any code that ends up in here will likely need a retry. |
||
decompressed => GpuTaskMetrics.get.readSpillFromDiskTime { | ||
withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c => | ||
c.position(fileOffset) | ||
withResource(Channels.newInputStream(c)) { compressed => | ||
withResource(serializerManager.wrapStream(id, compressed)) { in => | ||
withResource(new HostMemoryOutputStream(decompressed)) { out => | ||
IOUtils.copy(in, out) | ||
} | ||
decompressed | ||
} | ||
} | ||
} | ||
} | ||
} else { | ||
// Reserved mmap read fashion for UCX shuffle path. Also it's skipping encryption and | ||
// compression. | ||
HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, onDiskSizeInBytes) | ||
} | ||
hostBuffer = Some(memBuffer) | ||
} else { | ||
// Reserved mmap read fashion for UCX shuffle path. Also it's skipping encryption and | ||
// compression. | ||
HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, onDiskSizeInBytes) | ||
} | ||
hostBuffer.foreach(_.incRefCount()) | ||
hostBuffer.get | ||
memBuffer | ||
} | ||
|
||
override def close(): Unit = synchronized { | ||
if (refcount == 1) { | ||
// free the memory mapping since this is the last active reader | ||
hostBuffer.foreach { b => | ||
logDebug(s"closing mmap buffer $b") | ||
b.close() | ||
} | ||
hostBuffer = None | ||
} | ||
super.close() | ||
} | ||
|
||
override protected def releaseResources(): Unit = { | ||
require(hostBuffer.isEmpty, | ||
"Releasing a disk buffer with non-empty host buffer") | ||
// Buffers that share paths must be cleaned up elsewhere | ||
if (id.canShareDiskPaths) { | ||
sharedBufferFiles.remove(id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not introduced by this PR but since it touches them, can we update these variables to be more mnemonic than
sdb
andsob
to improve code readability