diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 0ceeb1a524905..3cce7717d8fb9 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -90,18 +90,19 @@ private[spark] class TorrentBroadcast[T: ClassTag]( private def readBlocks(): Array[ByteBuffer] = { // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported // to the driver, so other executors can pull these chunks from this executor as well. - var numBlocksAvailable = 0 val blocks = new Array[ByteBuffer](numBlocks) for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { val pieceId = BroadcastBlockId(id, "piece" + pid) - SparkEnv.get.blockManager.getRemoteBytes(pieceId) match { - case Some(x) => - blocks(pid) = x.asInstanceOf[ByteBuffer] - numBlocksAvailable += 1 + // Note that we use getBytes rather than getRemoteBytes here because there is a chance + // that previous attempts to fetch the broadcast blocks have already fetched some of the + // blocks. In that case, some blocks would be available locally (on this executor). + SparkEnv.get.blockManager.getBytes(pieceId) match { + case Some(block) => + blocks(pid) = block SparkEnv.get.blockManager.putBytes( pieceId, - blocks(pid), + block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e4c3d58905e7f..1d37a29ee0b21 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -517,6 +517,16 @@ private[spark] class BlockManager( None } + def getBytes(blockId: BlockId): Option[ByteBuffer] = { + val local = getLocalBytes(blockId) + if (local.isDefined) { + local + } else { + val remote = getRemoteBytes(blockId) + remote + } + } + /** * Get a block from the block manager (either local or remote). */