Skip to content

Commit

Permalink
[SPARK-9824] [CORE] Fix the issue that InternalAccumulator leaks Weak…
Browse files Browse the repository at this point in the history
…Reference

`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to register itself with ContextCleaner, so `WeakReference`s for these accumulators in `Accumulators.originals` won't be removed.

This PR added `registerAccumulatorForCleanup` for internal accumulators to avoid the memory leak.

Author: zsxwing <zsxwing@gmail.com>

Closes #8108 from zsxwing/internal-accumulators-leak.
  • Loading branch information
zsxwing authored and rxin committed Aug 11, 2015
1 parent 00c0272 commit f16bc68
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,18 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(): Seq[Accumulator[Long]] = {
Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
val internalAccumulators = Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
internalAccumulators.foreach { accumulator =>
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
}
internalAccumulators
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] abstract class Stage(
* accumulators here again will override partial values from the finished tasks.
*/
def resetInternalAccumulators(): Unit = {
_internalAccumulators = InternalAccumulator.create()
_internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}

test("internal accumulators in TaskContext") {
val accums = InternalAccumulator.create()
sc = new SparkContext("local", "test")
val accums = InternalAccumulator.create(sc)
val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
val collectedInternalAccums = taskContext.collectInternalAccumulators()
Expand Down

0 comments on commit f16bc68

Please sign in to comment.