Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jinxing committed May 25, 2017
1 parent 2ce2699 commit b07a3b6
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public class FileSegmentManagedBuffer extends ManagedBuffer {
public final class FileSegmentManagedBuffer extends ManagedBuffer {
private final TransportConf conf;
private final File file;
private final long offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ final class ShuffleBlockFetcherIterator(
maxBytesInFlight: Long,
maxReqsInFlight: Int,
maxReqSizeShuffleToMem: Long,
detectCorrupt: Boolean) extends Iterator[(BlockId, InputStream)] with Logging {
detectCorrupt: Boolean)
extends Iterator[(BlockId, InputStream)] with Logging {

import ShuffleBlockFetcherIterator._

Expand Down Expand Up @@ -161,7 +162,7 @@ final class ShuffleBlockFetcherIterator(
while (iter.hasNext) {
val result = iter.next()
result match {
case SuccessFetchResult(bId, address, _, buf, _) =>
case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
shuffleMetrics.incRemoteBlocksFetched(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io.{File, InputStream, IOException}
import java.util.UUID
import java.util.concurrent.Semaphore

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -132,6 +133,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
verify(mockBuf, times(1)).release()
verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
}

// 3 local blocks, and 2 remote blocks
// (but from the same block manager so one call to fetchBlocks)
verify(blockManager, times(3)).getBlockData(any())
Expand Down Expand Up @@ -418,7 +420,10 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
doReturn(localBmId).when(blockManager).blockManagerId

val diskBlockManager = mock(classOf[DiskBlockManager])
doReturn(new File("shuffle-read-file")).when(diskBlockManager).getFile(any(classOf[String]))
doReturn{
var blockId = new TempLocalBlockId(UUID.randomUUID())
(blockId, new File(blockId.name))
}.when(diskBlockManager).createTempLocalBlock()
doReturn(diskBlockManager).when(blockManager).diskBlockManager

val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.reducer.maxReqSizeShuffleToMem</code></td>
<td>200 * 1024 * 1024</td>
<td>200m</td>
<td>
The blocks of a shuffle request will be fetched to disk when size of the request is above
this threshold. This is to avoid a giant request takes too much memory.
Expand Down

0 comments on commit b07a3b6

Please sign in to comment.