Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve internal row to columnar host memory by using a combined spillable buffer #10450

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,31 +128,32 @@ public ColumnarBatch next() {
Tuple2<SpillableColumnarBatch, NvtxRange> batchAndRange;
AutoCloseableTargetSize numRowsWrapper =
new AutoCloseableTargetSize(numRowsEstimate, 1);
Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize> bufsAndNumRows;
Tuple2<SpillableHostBuffer, AutoCloseableTargetSize> sBufAndNumRows;

// The row formatted data is stored as a column of lists of bytes. The current java CUDF APIs
// don't do a great job from a performance standpoint with building this type of data structure
// and we want this to be as efficient as possible so we are going to allocate two host memory
// buffers. One will be for the byte data and the second will be for the offsets. We will then
// write the data directly into those buffers using code generation in a child of this class.
// that implements fillBatch.
bufsAndNumRows =
sBufAndNumRows =
// Starting with initial num rows estimate, this retry block will recalculate the buffer
// sizes from the rows estimate, which is split in half if we get a split and retry oom,
// until we succeed or hit the min of 1 row.
// until we succeed or hit the min of 1 row. It allocates one spillable host buffer for
// both data and offsets.
RmmRapidsRetryIterator.withRetry(numRowsWrapper,
RmmRapidsRetryIterator.splitTargetSizeInHalfCpu(), (numRows) -> {
return allocBuffersWithRestore(numRows);
return allocBuffer(numRows);
}).next();
// Update our estimate for number of rows with the final size used to allocate the buffers.
numRowsEstimate = (int) bufsAndNumRows._2.targetSize();
numRowsEstimate = (int) sBufAndNumRows._2.targetSize();
long dataLength = calcDataLengthEstimate(numRowsEstimate);
long offsetLength = calcOffsetLengthEstimate(numRowsEstimate);
int used[];
try (SpillableHostBuffer spillableDataBuffer = bufsAndNumRows._1[0];
SpillableHostBuffer spillableOffsetsBuffer = bufsAndNumRows._1[1];
try (SpillableHostBuffer spillableBuffer = sBufAndNumRows._1;
) {
HostMemoryBuffer[] hBufs =
getHostBuffersWithRetry(spillableDataBuffer, spillableOffsetsBuffer);
getHostBuffersWithRetry(spillableBuffer, dataLength, offsetLength);
try(HostMemoryBuffer dataBuffer = hBufs[0];
HostMemoryBuffer offsetsBuffer = hBufs[1];
) {
Expand Down Expand Up @@ -210,11 +211,14 @@ public ColumnarBatch next() {
}

private HostMemoryBuffer[] getHostBuffersWithRetry(
SpillableHostBuffer spillableDataBuffer, SpillableHostBuffer spillableOffsetsBuffer) {
SpillableHostBuffer spillableBuffer, long dataLength, long offsetLength) {
return RmmRapidsRetryIterator.withRetryNoSplit( () -> {
try (HostMemoryBuffer dataBuffer = spillableDataBuffer.getHostBuffer();
HostMemoryBuffer offsetsBuffer = spillableOffsetsBuffer.getHostBuffer();
) {
// One SpillableHostBuffer is used for both data and offsets. Slice it into the
// two separate HostMemoryBuffers.
try (HostMemoryBuffer dataOffsetBuffer = spillableBuffer.getHostBuffer();
HostMemoryBuffer dataBuffer = dataOffsetBuffer.slice(0, dataLength);
HostMemoryBuffer offsetsBuffer = dataOffsetBuffer.slice(dataLength, offsetLength);
) {
// Increment these to keep them.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
Expand All @@ -223,63 +227,25 @@ private HostMemoryBuffer[] getHostBuffersWithRetry(
});
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) {
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
private Tuple2<SpillableHostBuffer, AutoCloseableTargetSize>
allocBuffer(AutoCloseableTargetSize numRowsWrapper) {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
HostMemoryBuffer hBuf = null;
try {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true);
sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[0] = null; // Was closed by spillable
hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true);
sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(),
hBuf = HostAlloc$.MODULE$.alloc((dataBytes + offsetBytes),true);
SpillableHostBuffer sBuf = SpillableHostBuffer$.MODULE$.apply(hBuf, hBuf.getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[1] = null; // Was closed by spillable
return Tuple2.apply(sBufs, numRowsWrapper);
hBuf = null; // taken over by spillable host buffer
return Tuple2.apply(sBuf, numRowsWrapper);
} finally {
// Make sure host buffers are always closed
for (int i = 0; i < hBufs.length; i++) {
if (hBufs[i] != null) {
hBufs[i].close();
hBufs[i] = null;
}
}
// If the second spillable buffer is null, we must have thrown,
// so we need to close the first one in case this is not a retry exception.
// Restore on retry is handled by the caller.
if ((sBufs[1] == null) && (sBufs[0] != null)) {
sBufs[0].close();
sBufs[0] = null;
if (hBuf != null) {
hBuf.close();
}
}
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffersWithRestore(AutoCloseableTargetSize numRows) {
SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{ null, null};
Retryable retryBufs = new Retryable() {
@Override
public void checkpoint() {}
@Override
public void restore() {
for (int i = 0; i < spillableBufs.length; i++) {
if (spillableBufs[i] != null) {
spillableBufs[i].close();
spillableBufs[i] = null;
}
}
}
};

return RmmRapidsRetryIterator.withRestoreOnRetry(retryBufs, () -> {
return allocBuffers(spillableBufs, numRows);
});
}

/**
* Take our device column of encoded rows and turn it into a spillable columnar batch.
* This allows us to go into a retry block and be able to roll back our work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
}
}

test("a retry when allocating dataBuffer is handled") {
test("a retry when allocating host buffer for data and offsets is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
Expand All @@ -189,7 +189,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a retry on the allocation of the dataBuffer
// this forces a retry on the allocation of the combined offsets/data buffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
Expand All @@ -203,7 +203,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
}
}

test("a retry when allocating offsetsBuffer is handled") {
test("a split and retry when allocating host buffer for data and offsets is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
Expand All @@ -215,33 +215,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a retry on the allocation of the offsetBuffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 1)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}

test("a split and retry when allocating dataBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the dataBuffer
// this forces a split retry on the allocation of the combined offsets/data buffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
Expand All @@ -254,29 +228,4 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
test("a split and retry when allocating offsetsBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the offsetsBuffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 1)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
}
Loading