Skip to content

Commit

Permalink
runOptimize and trim only
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Nov 16, 2015
1 parent b4b7a1c commit b9cc414
Showing 1 changed file with 12 additions and 25 deletions.
37 changes: 12 additions & 25 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,30 @@ private[spark] class CompressedMapStatus(

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty(isSparse)/non-empty(!isSparse).
* plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param markedBlocks a bitmap tracking which blocks are empty(isSparse)/non-empty(!isSparse)
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var markedBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private[this] var isSparse: Boolean)
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, false) // For deserialization only
protected def this() = this(null, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (isSparse ^ markedBlocks.contains(reduceId)) {
if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
Expand All @@ -154,17 +153,15 @@ private[spark] class HighlyCompressedMapStatus private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
markedBlocks.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeBoolean(isSparse)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
markedBlocks = new RoaringBitmap()
markedBlocks.readExternal(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
isSparse = in.readBoolean()
}
}

Expand All @@ -176,13 +173,11 @@ private[spark] object HighlyCompressedMapStatus {
var numNonEmptyBlocks: Int = 0
var totalSize: Long = 0
val emptyBlocks = new RoaringBitmap()
val nonEmptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
nonEmptyBlocks.add(i)
totalSize += size
} else {
emptyBlocks.add(i)
Expand All @@ -194,16 +189,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
if (numNonEmptyBlocks < totalNumBlocks / 2) {
// If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` true.
nonEmptyBlocks.runOptimize()
nonEmptyBlocks.trim()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true)
} else {
// If non-empty blocks are dense, we track empty blocks and set `isSparse` false.
emptyBlocks.runOptimize()
emptyBlocks.trim()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false)
}
emptyBlocks.runOptimize()
emptyBlocks.trim()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}

0 comments on commit b9cc414

Please sign in to comment.