From e15a1c6d2797977f9c19d5574b5f364b19b9757d Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 8 Dec 2023 15:15:02 +0800 Subject: [PATCH] [#1358] fix(spark): pre-check bytebuffer whether is direct before uncompress --- .../shuffle/reader/RssShuffleDataIterator.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java index 38ff12a2f8..d32227add3 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java @@ -150,6 +150,18 @@ public boolean hasNext() { return recordsIterator.hasNext(); } + private boolean isSameMemoryType(ByteBuffer left, ByteBuffer right) { + if (left.isDirect() && right.isDirect()) { + return true; + } + + if (!left.isDirect() && !right.isDirect()) { + return true; + } + + return false; + } + private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) { long rawDataLength = rawData.limit() - rawData.position(); totalRawBytesLength += rawDataLength; @@ -157,7 +169,9 @@ private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) { int uncompressedLen = rawBlock.getUncompressLength(); if (codec != null) { - if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) { + if (uncompressedData == null + || uncompressedData.capacity() < uncompressedLen + || !isSameMemoryType(uncompressedData, rawData)) { if (uncompressedData != null) { RssUtils.releaseByteBuffer(uncompressedData); }