From b9cc41439203f6924e75836a39fc603ea0714ceb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Nov 2015 11:05:51 +0800 Subject: [PATCH] runOptimize and trim only --- .../apache/spark/scheduler/MapStatus.scala | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 03e2062236bc2..42c6788773b7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -121,31 +121,30 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, - * plus a bitmap for tracking which blocks are empty(isSparse)/non-empty(!isSparse). + * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks - * @param markedBlocks a bitmap tracking which blocks are empty(isSparse)/non-empty(!isSparse) + * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, - private[this] var markedBlocks: RoaringBitmap, - private[this] var avgSize: Long, - private[this] var isSparse: Boolean) + private[this] var emptyBlocks: RoaringBitmap, + private[this] var avgSize: Long) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1, false) // For deserialization only + protected def this() = this(null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { - if (isSparse ^ markedBlocks.contains(reduceId)) { + if (emptyBlocks.contains(reduceId)) { 0 } else { avgSize @@ -154,17 +153,15 @@ private[spark] class HighlyCompressedMapStatus private ( override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) - markedBlocks.writeExternal(out) + emptyBlocks.writeExternal(out) out.writeLong(avgSize) - out.writeBoolean(isSparse) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) - markedBlocks = new RoaringBitmap() - markedBlocks.readExternal(in) + emptyBlocks = new RoaringBitmap() + emptyBlocks.readExternal(in) avgSize = in.readLong() - isSparse = in.readBoolean() } } @@ -176,13 +173,11 @@ private[spark] object HighlyCompressedMapStatus { var numNonEmptyBlocks: Int = 0 var totalSize: Long = 0 val emptyBlocks = new RoaringBitmap() - val nonEmptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 - nonEmptyBlocks.add(i) totalSize += size } else { emptyBlocks.add(i) @@ -194,16 +189,8 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - if (numNonEmptyBlocks < totalNumBlocks / 2) { - // If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` true. - nonEmptyBlocks.runOptimize() - nonEmptyBlocks.trim() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) - } else { - // If non-empty blocks are dense, we track empty blocks and set `isSparse` false. - emptyBlocks.runOptimize() - emptyBlocks.trim() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false) - } + emptyBlocks.runOptimize() + emptyBlocks.trim() + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) } }