Skip to content

Commit

Permalink
Code review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Aug 19, 2014
1 parent c1185cd commit 3670f00
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ private[spark] class TorrentBroadcast[T: ClassTag](
id: Long)
extends Broadcast[T](id) with Logging with Serializable {

override protected def getValue() = _value

/**
* Value of the broadcast object. On driver, this is set directly by the constructor.
* On executors, this is reconstructed by [[readObject]], which builds this value by reading
Expand All @@ -68,6 +66,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](

private val broadcastId = BroadcastBlockId(id)

override protected def getValue() = _value

/**
* Divide the object into multiple blocks and put those blocks in the block manager.
*
Expand All @@ -79,7 +79,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
// TODO: Use putBytes directly.
SparkEnv.get.blockManager.putSingle(
BroadcastBlockId(id, "piece" + i),
blocks(i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
Expand All @@ -89,7 +89,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
/** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[Array[Byte]] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these thunks from this executor as well.
// to the driver, so other executors can pull these chunks from this executor as well.
var numBlocksAvailable = 0
val blocks = new Array[Array[Byte]](numBlocks)

Expand Down Expand Up @@ -149,7 +149,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
logInfo("Reading broadcast variable " + id + " took " + time + " s")

_value = TorrentBroadcast.unBlockifyObject[T](blocks)
// Store the merged copy in BlockManager so other tasks on this executor doesn't
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
Expand Down

0 comments on commit 3670f00

Please sign in to comment.