Skip to content

Commit

Permalink
[#1358] fix(spark): pre-check bytebuffer whether is direct before unc…
Browse files Browse the repository at this point in the history
…ompress
  • Loading branch information
zuston committed Dec 8, 2023
1 parent c5be58f commit e15a1c6
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,28 @@ public boolean hasNext() {
return recordsIterator.hasNext();
}

private boolean isSameMemoryType(ByteBuffer left, ByteBuffer right) {
if (left.isDirect() && right.isDirect()) {
return true;
}

if (!left.isDirect() && !right.isDirect()) {
return true;
}

return false;
}

private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
long rawDataLength = rawData.limit() - rawData.position();
totalRawBytesLength += rawDataLength;
shuffleReadMetrics.incRemoteBytesRead(rawDataLength);

int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
if (uncompressedData == null
|| uncompressedData.capacity() < uncompressedLen
|| !isSameMemoryType(uncompressedData, rawData)) {
if (uncompressedData != null) {
RssUtils.releaseByteBuffer(uncompressedData);
}
Expand Down

0 comments on commit e15a1c6

Please sign in to comment.