diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 1d331039e5b59..c20fab83c3460 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -36,7 +36,7 @@ /** * A {@link ManagedBuffer} backed by a segment in a file. */ -public class FileSegmentManagedBuffer extends ManagedBuffer { +public final class FileSegmentManagedBuffer extends ManagedBuffer { private final TransportConf conf; private final File file; private final long offset; diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 8de204070d0f9..ee35060926555 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -65,7 +65,8 @@ final class ShuffleBlockFetcherIterator( maxBytesInFlight: Long, maxReqsInFlight: Int, maxReqSizeShuffleToMem: Long, - detectCorrupt: Boolean) extends Iterator[(BlockId, InputStream)] with Logging { + detectCorrupt: Boolean) + extends Iterator[(BlockId, InputStream)] with Logging { import ShuffleBlockFetcherIterator._ @@ -161,7 +162,7 @@ final class ShuffleBlockFetcherIterator( while (iter.hasNext) { val result = iter.next() result match { - case SuccessFetchResult(bId, address, _, buf, _) => + case SuccessFetchResult(_, address, _, buf, _) => if (address != blockManager.blockManagerId) { shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched(1) diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 43a0dbcf7cc59..1f813a909fb8b 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.io.{File, InputStream, IOException} +import java.util.UUID import java.util.concurrent.Semaphore import scala.concurrent.ExecutionContext.Implicits.global @@ -132,6 +133,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT verify(mockBuf, times(1)).release() verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() } + // 3 local blocks, and 2 remote blocks // (but from the same block manager so one call to fetchBlocks) verify(blockManager, times(3)).getBlockData(any()) @@ -418,7 +420,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT doReturn(localBmId).when(blockManager).blockManagerId val diskBlockManager = mock(classOf[DiskBlockManager]) - doReturn(new File("shuffle-read-file")).when(diskBlockManager).getFile(any(classOf[String])) + doReturn{ + var blockId = new TempLocalBlockId(UUID.randomUUID()) + (blockId, new File(blockId.name)) + }.when(diskBlockManager).createTempLocalBlock() doReturn(diskBlockManager).when(blockManager).diskBlockManager val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) diff --git a/docs/configuration.md b/docs/configuration.md index 807583c43057d..0771e36f80b50 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -521,7 +521,7 @@ Apart from these, the following properties are also available, and may be useful
spark.reducer.maxReqSizeShuffleToMem