diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 894091761485d..88cfc0c28b404 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -18,10 +18,12 @@ package org.apache.spark import java.io._ +import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.{HashSet, HashMap, Map} import scala.concurrent.Await +import scala.collection.JavaConversions._ import akka.actor._ import akka.pattern.ask @@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * On the master, it serves as the source of map outputs recorded from ShuffleMapTasks. * On the workers, it simply serves as a cache, in which a miss triggers a fetch from the * master's corresponding HashMap. + * + * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a + * thread-safe map. */ protected val mapStatuses: Map[Int, Array[MapStatus]] @@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * MapOutputTrackerMaster. */ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { - protected val mapStatuses = new HashMap[Int, Array[MapStatus]] + protected val mapStatuses: Map[Int, Array[MapStatus]] = + new ConcurrentHashMap[Int, Array[MapStatus]] } private[spark] object MapOutputTracker {