Skip to content

Commit

Permalink
[SPARK-24519][CORE] Compute SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS …
Browse files Browse the repository at this point in the history
…only once

## What changes were proposed in this pull request?
Previously SPARK-24519 created a modifiable config SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS. However, the config is being parsed for every creation of MapStatus, which could be very expensive. Another problem with the previous approach is that it created the illusion that this can be changed dynamically at runtime, which was not true. This PR changes it so the config is computed only once.

## How was this patch tested?
Removed a test case that's no longer valid.

Closes #22521 from rxin/SPARK-24519.

Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit e702fb1)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
rxin authored and dongjoon-hyun committed Sep 26, 2018
1 parent dc60476 commit 8d17200
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 31 deletions.
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ private[spark] sealed trait MapStatus {

private[spark] object MapStatus {

/**
* Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly here because in test
* code we can't assume SparkEnv.get exists.
*/
private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,32 +188,4 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}

test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
val conf = new SparkConf()
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
val sizes = Array.fill[Long](500)(150L)
// Test default value
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[CompressedMapStatus])
// Test Non-positive values
for (s <- -1 to 0) {
assertThrows[IllegalArgumentException] {
conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
val status = MapStatus(null, sizes)
}
}
// Test positive values
Seq(1, 100, 499, 500, 501).foreach { s =>
conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
val status = MapStatus(null, sizes)
if(sizes.length > s) {
assert(status.isInstanceOf[HighlyCompressedMapStatus])
} else {
assert(status.isInstanceOf[CompressedMapStatus])
}
}
}
}

0 comments on commit 8d17200

Please sign in to comment.