Skip to content

Commit

Permalink
[SPARK-23524] Big local shuffle blocks should not be checked for corr…
Browse files Browse the repository at this point in the history
…uption.
  • Loading branch information
jx158167 committed Feb 28, 2018
1 parent 8077bb0 commit 110c851
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ final class ShuffleBlockFetcherIterator(
private[this] val startTime = System.currentTimeMillis

/** Local blocks to fetch, excluding zero-sized blocks. */
private[this] val localBlocks = new ArrayBuffer[BlockId]()
private[this] val localBlocks = scala.collection.mutable.LinkedHashSet[BlockId]()

/** Remote blocks to fetch, excluding zero-sized blocks. */
private[this] val remoteBlocks = new HashSet[BlockId]()
Expand Down Expand Up @@ -316,6 +316,7 @@ final class ShuffleBlockFetcherIterator(
* track in-memory are the ManagedBuffer references themselves.
*/
private[this] def fetchLocalBlocks() {
logDebug(s"Start fetching local blocks: ${localBlocks.mkString(", ")}")
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
Expand All @@ -324,7 +325,8 @@ final class ShuffleBlockFetcherIterator(
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false))
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId,
buf.size(), buf, false))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
Expand Down Expand Up @@ -397,7 +399,9 @@ final class ShuffleBlockFetcherIterator(
}
shuffleMetrics.incRemoteBlocksFetched(1)
}
bytesInFlight -= size
if (!localBlocks.contains(blockId)) {
bytesInFlight -= size
}
if (isNetworkReqDone) {
reqsInFlight -= 1
logDebug("Number of requests in flight " + reqsInFlight)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
intercept[FetchFailedException] { iterator.next() }
}

test("big corrupt blocks will not be retiried") {
val corruptStream = mock(classOf[InputStream])
when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt"))
val corruptBuffer = mock(classOf[ManagedBuffer])
when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
doReturn(10000L).when(corruptBuffer).size()

val blockManager = mock(classOf[BlockManager])
val localBmId = BlockManagerId("test-client", "test-client", 1)
doReturn(localBmId).when(blockManager).blockManagerId
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
val localBlockLengths = Seq[Tuple2[BlockId, Long]](
ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
)

val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
)

val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
.thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
val blocks = invocation.getArguments()(3).asInstanceOf[Array[String]]
Future {
blocks.foreach (listener.onBlockFetchSuccess(_, corruptBuffer))
}
}
})

val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(localBmId, localBlockLengths),
(remoteBmId, remoteBlockLengths)
)

val taskContext = TaskContext.empty()
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
blockManager,
blocksByAddress,
(_, in) => new LimitedInputStream(in, 10000),
2048,
Int.MaxValue,
Int.MaxValue,
Int.MaxValue,
true)
// Blocks should be returned without exceptions.
val blockSet = collection.mutable.HashSet[BlockId]()
blockSet.add(iterator.next()._1)
blockSet.add(iterator.next()._1)
assert(blockSet == collection.immutable.HashSet(
ShuffleBlockId(0, 0, 0), ShuffleBlockId(0, 1, 0)))
}

test("retry corrupt blocks (disabled)") {
val blockManager = mock(classOf[BlockManager])
val localBmId = BlockManagerId("test-client", "test-client", 1)
Expand Down

0 comments on commit 110c851

Please sign in to comment.