Skip to content

Commit

Permalink
Added getBytes to BlockManager and uses that in TorrentBroadcast.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 19, 2014
1 parent 2d6a5fb commit 0d8ed5b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private def readBlocks(): Array[ByteBuffer] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these chunks from this executor as well.
var numBlocksAvailable = 0
val blocks = new Array[ByteBuffer](numBlocks)

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
SparkEnv.get.blockManager.getRemoteBytes(pieceId) match {
case Some(x) =>
blocks(pid) = x.asInstanceOf[ByteBuffer]
numBlocksAvailable += 1
// Note that we use getBytes rather than getRemoteBytes here because there is a chance
// that previous attempts to fetch the broadcast blocks have already fetched some of the
// blocks. In that case, some blocks would be available locally (on this executor).
SparkEnv.get.blockManager.getBytes(pieceId) match {
case Some(block) =>
blocks(pid) = block
SparkEnv.get.blockManager.putBytes(
pieceId,
blocks(pid),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,16 @@ private[spark] class BlockManager(
None
}

def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val local = getLocalBytes(blockId)
if (local.isDefined) {
local
} else {
val remote = getRemoteBytes(blockId)
remote
}
}

/**
* Get a block from the block manager (either local or remote).
*/
Expand Down

0 comments on commit 0d8ed5b

Please sign in to comment.