Skip to content

Commit

Permalink
[SPARK-20955][CORE] Intern "executorId" to reduce the memory usage
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In [this line](https://github.com/apache/spark/blob/f7cf2096fdecb8edab61c8973c07c6fc877ee32d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L128), it uses the `executorId` string received from executors and finally it will go into `TaskUIData`. As deserializing the `executorId` string will always create a new instance, we have a lot of duplicated string instances.

This PR does a String interning for TaskUIData to reduce the memory usage.

## How was this patch tested?

Manually test using `bin/spark-shell --master local-cluster[6,1,1024]`. Test codes:
```
for (_ <- 1 to 10) { sc.makeRDD(1 to 1000, 1000).count() }
Thread.sleep(2000)
val l = sc.getClass.getMethod("jobProgressListener").invoke(sc).asInstanceOf[org.apache.spark.ui.jobs.JobProgressListener]
org.apache.spark.util.SizeEstimator.estimate(l.stageIdToData)
```
This PR reduces the size of `stageIdToData` from 3487280 to 3009744 (86.3%) in the above case.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #18177 from zsxwing/SPARK-20955.
  • Loading branch information
zsxwing committed Jun 2, 2017
1 parent e11d90b commit 16186cd
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.collection.mutable.{HashMap, LinkedHashMap}

import com.google.common.collect.Interners

import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor._
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
Expand Down Expand Up @@ -141,6 +143,14 @@ private[spark] object UIData {
}

object TaskUIData {

private val stringInterner = Interners.newWeakInterner[String]()

/** String interning to reduce the memory usage. */
private def weakIntern(s: String): String = {
stringInterner.intern(s)
}

def apply(taskInfo: TaskInfo): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
}
Expand All @@ -155,8 +165,8 @@ private[spark] object UIData {
index = taskInfo.index,
attemptNumber = taskInfo.attemptNumber,
launchTime = taskInfo.launchTime,
executorId = taskInfo.executorId,
host = taskInfo.host,
executorId = weakIntern(taskInfo.executorId),
host = weakIntern(taskInfo.host),
taskLocality = taskInfo.taskLocality,
speculative = taskInfo.speculative
)
Expand Down

0 comments on commit 16186cd

Please sign in to comment.