Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrennan333 committed Dec 11, 2023
1 parent 15cef09 commit b73fbd8
Showing 1 changed file with 38 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ protected InternalRowToColumnarBatchIterator(
}

private int calcNumRowsEstimate(long targetBytes) {
return (int) Math.max(1,
Math.min(Integer.MAX_VALUE - 1, targetBytes / sizePerRowEstimate));
return Math.max(1,
Math.min(Integer.MAX_VALUE - 1, (int) (targetBytes / sizePerRowEstimate)));
}

private long calcDataLengthEstimate(int numRows) {
return ((long) sizePerRowEstimate) * numRows;
}
Expand Down Expand Up @@ -208,27 +209,43 @@ public ColumnarBatch next() {
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffers(HostMemoryBuffer[] hBufs, SpillableHostBuffer[] sBufs,
AutoCloseableTargetSize numRowsWrapper) {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true);
sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[0] = null; // Was closed by spillable
hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true);
sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[1] = null; // Was closed by spillable
return Tuple2.apply(sBufs, numRowsWrapper);
allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) {
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
try {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true);
sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[0] = null; // Was closed by spillable
hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true);
sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[1] = null; // Was closed by spillable
return Tuple2.apply(sBufs, numRowsWrapper);
} finally {
// Make sure host buffers are always closed
for (int i = 0; i < hBufs.length; i++) {
if (hBufs[i] != null) {
hBufs[i].close();
hBufs[i] = null;
}
}
// If the second spillable buffer is null, we must have thrown,
// so we need to close the first one in case this is not a retry exception.
// Restore on retry is handled by the caller.
if ((sBufs[1] == null) && (sBufs[0] != null)) {
sBufs[0].close();
sBufs[0] = null;
}
}
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffersWithRestore(AutoCloseableTargetSize numRows) {
HostMemoryBuffer[] hostBufs = new HostMemoryBuffer[]{null, null };
SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{null, null};
SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{ null, null};
Retryable retryBufs = new Retryable() {
@Override
public void checkpoint() {}
Expand All @@ -239,15 +256,12 @@ public void restore() {
spillableBufs[i].close();
spillableBufs[i] = null;
}
if (hostBufs[i] != null) {
hostBufs[i].close();
hostBufs[i] = null;
}
}
}
};

return RmmRapidsRetryIterator.withRestoreOnRetry(retryBufs, () -> {
return allocBuffers(hostBufs, spillableBufs, numRows);
return allocBuffers(spillableBufs, numRows);
});
}

Expand Down

0 comments on commit b73fbd8

Please sign in to comment.