Skip to content

Commit

Permalink
Improve internal row to columnar host memory by using a combined spil…
Browse files Browse the repository at this point in the history
…lable buffer (#10450)

Signed-off-by: Jim Brennan <jimb@nvidia.com>
  • Loading branch information
jbrennan333 authored Feb 21, 2024
1 parent 0bf3c28 commit d2c9ed4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,31 +128,32 @@ public ColumnarBatch next() {
Tuple2<SpillableColumnarBatch, NvtxRange> batchAndRange;
AutoCloseableTargetSize numRowsWrapper =
new AutoCloseableTargetSize(numRowsEstimate, 1);
Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize> bufsAndNumRows;
Tuple2<SpillableHostBuffer, AutoCloseableTargetSize> sBufAndNumRows;

// The row formatted data is stored as a column of lists of bytes. The current java CUDF APIs
// don't do a great job from a performance standpoint with building this type of data structure
// and we want this to be as efficient as possible so we are going to allocate two host memory
// buffers. One will be for the byte data and the second will be for the offsets. We will then
// write the data directly into those buffers using code generation in a child of this class.
// that implements fillBatch.
bufsAndNumRows =
sBufAndNumRows =
// Starting with initial num rows estimate, this retry block will recalculate the buffer
// sizes from the rows estimate, which is split in half if we get a split and retry oom,
// until we succeed or hit the min of 1 row.
// until we succeed or hit the min of 1 row. It allocates one spillable host buffer for
// both data and offsets.
RmmRapidsRetryIterator.withRetry(numRowsWrapper,
RmmRapidsRetryIterator.splitTargetSizeInHalfCpu(), (numRows) -> {
return allocBuffersWithRestore(numRows);
return allocBuffer(numRows);
}).next();
// Update our estimate for number of rows with the final size used to allocate the buffers.
numRowsEstimate = (int) bufsAndNumRows._2.targetSize();
numRowsEstimate = (int) sBufAndNumRows._2.targetSize();
long dataLength = calcDataLengthEstimate(numRowsEstimate);
long offsetLength = calcOffsetLengthEstimate(numRowsEstimate);
int used[];
try (SpillableHostBuffer spillableDataBuffer = bufsAndNumRows._1[0];
SpillableHostBuffer spillableOffsetsBuffer = bufsAndNumRows._1[1];
try (SpillableHostBuffer spillableBuffer = sBufAndNumRows._1;
) {
HostMemoryBuffer[] hBufs =
getHostBuffersWithRetry(spillableDataBuffer, spillableOffsetsBuffer);
getHostBuffersWithRetry(spillableBuffer, dataLength, offsetLength);
try(HostMemoryBuffer dataBuffer = hBufs[0];
HostMemoryBuffer offsetsBuffer = hBufs[1];
) {
Expand Down Expand Up @@ -210,11 +211,14 @@ public ColumnarBatch next() {
}

private HostMemoryBuffer[] getHostBuffersWithRetry(
SpillableHostBuffer spillableDataBuffer, SpillableHostBuffer spillableOffsetsBuffer) {
SpillableHostBuffer spillableBuffer, long dataLength, long offsetLength) {
return RmmRapidsRetryIterator.withRetryNoSplit( () -> {
try (HostMemoryBuffer dataBuffer = spillableDataBuffer.getHostBuffer();
HostMemoryBuffer offsetsBuffer = spillableOffsetsBuffer.getHostBuffer();
) {
// One SpillableHostBuffer is used for both data and offsets. Slice it into the
// two separate HostMemoryBuffers.
try (HostMemoryBuffer dataOffsetBuffer = spillableBuffer.getHostBuffer();
HostMemoryBuffer dataBuffer = dataOffsetBuffer.slice(0, dataLength);
HostMemoryBuffer offsetsBuffer = dataOffsetBuffer.slice(dataLength, offsetLength);
) {
// Increment these to keep them.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
Expand All @@ -223,63 +227,25 @@ private HostMemoryBuffer[] getHostBuffersWithRetry(
});
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) {
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
private Tuple2<SpillableHostBuffer, AutoCloseableTargetSize>
allocBuffer(AutoCloseableTargetSize numRowsWrapper) {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
HostMemoryBuffer hBuf = null;
try {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true);
sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[0] = null; // Was closed by spillable
hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true);
sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(),
hBuf = HostAlloc$.MODULE$.alloc((dataBytes + offsetBytes),true);
SpillableHostBuffer sBuf = SpillableHostBuffer$.MODULE$.apply(hBuf, hBuf.getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[1] = null; // Was closed by spillable
return Tuple2.apply(sBufs, numRowsWrapper);
hBuf = null; // taken over by spillable host buffer
return Tuple2.apply(sBuf, numRowsWrapper);
} finally {
// Make sure host buffers are always closed
for (int i = 0; i < hBufs.length; i++) {
if (hBufs[i] != null) {
hBufs[i].close();
hBufs[i] = null;
}
}
// If the second spillable buffer is null, we must have thrown,
// so we need to close the first one in case this is not a retry exception.
// Restore on retry is handled by the caller.
if ((sBufs[1] == null) && (sBufs[0] != null)) {
sBufs[0].close();
sBufs[0] = null;
if (hBuf != null) {
hBuf.close();
}
}
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffersWithRestore(AutoCloseableTargetSize numRows) {
SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{ null, null};
Retryable retryBufs = new Retryable() {
@Override
public void checkpoint() {}
@Override
public void restore() {
for (int i = 0; i < spillableBufs.length; i++) {
if (spillableBufs[i] != null) {
spillableBufs[i].close();
spillableBufs[i] = null;
}
}
}
};

return RmmRapidsRetryIterator.withRestoreOnRetry(retryBufs, () -> {
return allocBuffers(spillableBufs, numRows);
});
}

/**
* Take our device column of encoded rows and turn it into a spillable columnar batch.
* This allows us to go into a retry block and be able to roll back our work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
}
}

test("a retry when allocating dataBuffer is handled") {
test("a retry when allocating host buffer for data and offsets is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
Expand All @@ -189,7 +189,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a retry on the allocation of the dataBuffer
// this forces a retry on the allocation of the combined offsets/data buffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
Expand All @@ -203,7 +203,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
}
}

test("a retry when allocating offsetsBuffer is handled") {
test("a split and retry when allocating host buffer for data and offsets is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
Expand All @@ -215,33 +215,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a retry on the allocation of the offsetBuffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 1)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}

test("a split and retry when allocating dataBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the dataBuffer
// this forces a split retry on the allocation of the combined offsets/data buffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
Expand All @@ -254,29 +228,4 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
test("a split and retry when allocating offsetsBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the offsetsBuffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 1)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
}

0 comments on commit d2c9ed4

Please sign in to comment.