-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve host memory spill interfaces #10065
Conversation
Signed-off-by: Jim Brennan <jimb@nvidia.com>
build |
) { | ||
used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); | ||
} | ||
hBufs = getHostBuffersWithRetry(sdb, sob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not certain there is a good reason to have a second block here. I did it this way as a test, because previously I was hitting problems with the with-write-lock followed by with-read-only blocks. But at this point I think I could just combine these two blocks.
case Some(existingBuffer) => existingBuffer | ||
case None => | ||
val maybeNewBuffer = hostStorage.copyBuffer(buffer, this, stream) | ||
maybeNewBuffer.map { newBuffer => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This buffer is initially created as spillable. But it changes to unspillable when the caller does a getHostMemoryBuffer on it. Not sure if I should be concerned about this brief window of spillability?
maybeNewBuffer.map { newBuffer => | ||
logDebug(s"got new RapidsHostMemoryStore buffer ${newBuffer.id}") | ||
newBuffer.addReference() // add a reference since we are about to use it | ||
updateTiers(BufferSpill(buffer, Some(newBuffer))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The debug log output from update tiers make this sound like a spill, where in fact this is an unspill. Perhaps I should modify updateTiers to compare the storageTiers and change the log appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was created only with spill in mind, not unspill. Could BufferSpill
be part of a hierarchy of classes:
SpillAction
and BufferSpill
is a SpillAction
and BufferUnspill
is also a SpillAction
?
val serializerManager = diskBlockManager.getSerializerManager() | ||
val memBuffer = if (serializerManager.isRapidsSpill(id)) { | ||
// Only go through serializerManager's stream wrapper for spill case | ||
closeOnExcept(HostAlloc.alloc(uncompressedSize)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this HostAlloc.alloc() is what really increases how much heap memory I can get away with when running queries. This also requires that any code that ends up in here will likely need a retry.
case _ => | ||
throw new IllegalStateException("copying from buffer without device memory") | ||
// If the other is from the local disk store, we are unspilling to host memory. | ||
if (other.storageTier == StorageTier.DISK) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally tested this without this block for StorageTier.DISK, and it worked.
But in that case we do a HostAlloc here and another one in the disk store, and then copy the buffer to this one. I changed it to just take over the buffer from the disk store to eliminate the extra alloc/copy.
rapidsBuffer.getHostMemoryBuffer | ||
} | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not remove withHostBufferReadOnly/withHostBufferWriteLock yet, but I don't think we need them if we switch to this alternate approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd remove them if they are not useful
I have tested this by running existing unit and integration tests, and I have also been using nds queries to test this.
I am able to force a lot of |
updateTiers(BufferSpill(buffer, Some(newBuffer))) | ||
buffer.safeFree() | ||
newBuffer | ||
}.get // the GPU store has to return a buffer here for now, or throw OOM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}.get // the GPU store has to return a buffer here for now, or throw OOM | |
}.get // the host store has to return a buffer here for now, or throw OOM |
This LGTM @jbrennan333 |
try ( | ||
SpillableHostBuffer sdb = bufsAndNumRows._1[0]; | ||
int used[]; | ||
try (SpillableHostBuffer sdb = bufsAndNumRows._1[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not introduced by this PR but since it touches them, can we update these variables to be more mnemonic than sdb
and sob
to improve code readability
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still trying to understand big picture, just nits
@@ -208,6 +209,23 @@ public ColumnarBatch next() { | |||
} | |||
} | |||
|
|||
private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer sdb, SpillableHostBuffer sob) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer sdb, SpillableHostBuffer sob) { | |
private HostMemoryBuffer[] getHostBuffersWithRetry(SpillableHostBuffer spillableDataBuffer, SpillableHostBuffer spillableOffsetsBuffer) { |
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null }; | ||
try { | ||
hBufs[0] = sdb.getHostBuffer(); | ||
hBufs[1] = sob.getHostBuffer(); | ||
return hBufs; | ||
} finally { | ||
// If the second buffer is null, we must have thrown, so close the first one. | ||
if ((hBufs[1] == null) && (hBufs[0] != null)) { | ||
hBufs[0].close(); | ||
hBufs[0] = null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to test for the exception implicitly, would this also work?
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null }; | |
try { | |
hBufs[0] = sdb.getHostBuffer(); | |
hBufs[1] = sob.getHostBuffer(); | |
return hBufs; | |
} finally { | |
// If the second buffer is null, we must have thrown, so close the first one. | |
if ((hBufs[1] == null) && (hBufs[0] != null)) { | |
hBufs[0].close(); | |
hBufs[0] = null; | |
} | |
} | |
HostMemoryBuffer dataBuffer = spillableDataBuffer.getHostBuffer(); | |
HostMemoryBuffer offsetsBuffer = null; | |
try { | |
offsetsBuffer = spillableOffestBuffer.getHostBuffer(); | |
} catch (Throwable t) { | |
dataBuffer.close(); | |
} | |
return new HostMemoryBuffer[] {dataBuffer, offsetsBuffer}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gerashegalov. I have simplified this by just doing a try-with-resources for both buffers and incrementing the refcounts in the body so they are retained if didn't throw.
build |
This fixes #10004
This changes the host memory spill to look more like
SpillableColumnarBatch
. Instead of usingwithHostMemoryReadOnly
and withHostMemoryWriteLock, we can instead just theSpillableHostBuffer.getHostBuffer()
.This also changes
RapidsDiskBuffer.getMemoryBuffer
to no longer retain aHostMemoryBuffer
, and to useHostAlloc.alloc()
for obtaining theHostMemoryBuffer
it returns. This may require callers to handle retries.I have also made changes to
InternalRowToColumnarBatchIterator
to use this new interface.I am putting this up as a draft so I can get some feedback on the approach.