From 39a82f2b3ccf68a9a4d1abb0561b5680278a2610 Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Tue, 2 Dec 2014 23:11:11 -0500 Subject: [PATCH 1/4] Clear local copies of accumulators as soon as we're done with them --- core/src/main/scala/org/apache/spark/Accumulators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 000bbd6b532ad..cfaf4441374e1 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -311,6 +311,7 @@ private object Accumulators { for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { ret(id) = accum.localValue } + localAccums.remove(Thread.currentThread) return ret } From 537baad0379644537f21385f0cc1150b4af0b237 Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Fri, 5 Dec 2014 14:44:38 -0500 Subject: [PATCH 2/4] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task. --- .../main/scala/org/apache/spark/Accumulators.scala | 13 +++++++------ .../scala/org/apache/spark/executor/Executor.scala | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index cfaf4441374e1..f2fd9848e6d53 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import java.util.concurrent.atomic.AtomicLong +import java.lang.ThreadLocal import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -281,7 +282,9 @@ object AccumulatorParam { private object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() - val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() + val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { + override protected def initialValue() = Map[Long, Accumulable[_, _]]() + } var lastId: Long = 0 def newId(): Long = synchronized { @@ -293,25 +296,23 @@ private object Accumulators { if (original) { originals(a.id) = a } else { - val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) - accums(a.id) = a + localAccums.get()(a.id) = a } } // Clear the local (non-original) accumulators for the current thread def clear() { synchronized { - localAccums.remove(Thread.currentThread) + localAccums.get.clear } } // Get the values of the local accumulators for the current thread (by ID) def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() - for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { + for ((id, accum) <- localAccums.get) { ret(id) = accum.localValue } - localAccums.remove(Thread.currentThread) return ret } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 835157fc520aa..52de6980ecbf8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -172,7 +172,6 @@ private[spark] class Executor( val startGCTime = gcTime try { - Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) @@ -278,6 +277,8 @@ private[spark] class Executor( env.shuffleMemoryManager.releaseMemoryForThisThread() // Release memory used by this thread for unrolling blocks env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() + // Release memory used by this thread for accumulators + Accumulators.clear() runningTasks.remove(taskId) } } From b6c2180f98e2fc35970b0c646c189b376c949255 Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Fri, 5 Dec 2014 16:53:28 -0500 Subject: [PATCH 3/4] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark. --- project/MimaExcludes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 230239aa40500..ccae8916289d7 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -53,6 +53,10 @@ object MimaExcludes { "org.apache.spark.mllib.linalg.Matrices.randn"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.linalg.Matrices.rand") + ) ++ Seq( + // SPARK-4772 + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "org.apache.spark.Accumulators.localAccums") ) case v if v.startsWith("1.2") => From a581f3f99e072b39c3dcc0a103306b933ceea05b Mon Sep 17 00:00:00 2001 From: Nathan Kronenfeld Date: Tue, 9 Dec 2014 19:09:24 -0500 Subject: [PATCH 4/4] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests --- core/src/main/scala/org/apache/spark/Accumulators.scala | 2 +- project/MimaExcludes.scala | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index f2fd9848e6d53..5f31bfba3f8d6 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -279,7 +279,7 @@ object AccumulatorParam { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right -private object Accumulators { +private[spark] object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ccae8916289d7..230239aa40500 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -53,10 +53,6 @@ object MimaExcludes { "org.apache.spark.mllib.linalg.Matrices.randn"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.linalg.Matrices.rand") - ) ++ Seq( - // SPARK-4772 - ProblemFilters.exclude[IncompatibleResultTypeProblem]( - "org.apache.spark.Accumulators.localAccums") ) case v if v.startsWith("1.2") =>