diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 92bef0f38ef75..a78bf4241f6f2 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -572,32 +572,41 @@ class ExternalAppendOnlyMap[K, V, C]( private var nextUpstream: Iterator[(K, C)] = null - private var cur: (K, C) = null + private var cur: (K, C) = readNext() + + private var hasSpilled: Boolean = false def spill(): Boolean = synchronized { - if (upstream == null || nextUpstream != null) { + if (hasSpilled) { false } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") nextUpstream = spillMemoryIteratorToDisk(upstream) + hasSpilled = true true } } - override def hasNext: Boolean = synchronized { + def readNext(): (K, C) = synchronized { if (nextUpstream != null) { upstream = nextUpstream nextUpstream = null } - val r = upstream.hasNext - if (r) { - cur = upstream.next() + if (upstream.hasNext) { + upstream.next() + } else { + null } - r } - override def next(): (K, C) = cur + override def hasNext(): Boolean = cur != null + + override def next(): (K, C) = { + val r = cur + cur = readNext() + r + } } /** Convenience function to hash the given (K, C) pair by the key. */ diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 14140dc73f3a8..8bc0861ef9fee 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -785,10 +785,12 @@ private[spark] class ExternalSorter[K, V, C]( private var nextUpstream: Iterator[((Int, K), C)] = null - private var cur: ((Int, K), C) = null + private var cur: ((Int, K), C) = readNext() + + private var hasSpilled: Boolean = false def spill(): Boolean = synchronized { - if (upstream == null || nextUpstream != null) { + if (hasSpilled) { false } else { val inMemoryIterator = new WritablePartitionedIterator { @@ -812,22 +814,29 @@ private[spark] class ExternalSorter[K, V, C]( val iterator = spillReader.readNextPartition() iterator.map(cur => ((p, cur._1), cur._2)) } + hasSpilled = true true } } - override def hasNext: Boolean = synchronized { + def readNext(): ((Int, K), C) = synchronized { if (nextUpstream != null) { upstream = nextUpstream nextUpstream = null } - val r = upstream.hasNext - if (r) { - cur = upstream.next() + if (upstream.hasNext) { + upstream.next() + } else { + null } - r } - override def next(): ((Int, K), C) = cur + override def hasNext(): Boolean = cur != null + + override def next(): ((Int, K), C) = { + val r = cur + cur = readNext() + r + } } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index d122b79934186..5dbe9c615b291 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -422,9 +422,10 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = false) .set("spark.shuffle.memoryFraction", "0.01") .set("spark.memory.useLegacyMode", "true") + .set("spark.testing.memory", "100000000") .set("spark.shuffle.sort.bypassMergeThreshold", "0") sc = new SparkContext("local", "test", conf) - val N = 2e6.toInt + val N = 2e5.toInt sc.parallelize(1 to N, 10) .map { i => (i, i) } .groupByKey() diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index c8f26dcb931dd..2f35153fbd192 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -609,13 +609,14 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } } - test("force to spill for sorting") { + test("force to spill for external sorter") { val conf = createSparkConf(loadDefaults = false, kryo = false) .set("spark.shuffle.memoryFraction", "0.01") .set("spark.memory.useLegacyMode", "true") + .set("spark.testing.memory", "100000000") .set("spark.shuffle.sort.bypassMergeThreshold", "0") sc = new SparkContext("local", "test", conf) - val N = 2e6.toInt + val N = 2e5.toInt val p = new org.apache.spark.HashPartitioner(10) val p2 = new org.apache.spark.HashPartitioner(5) sc.parallelize(1 to N, 10) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7730823f9411b..98d6219552e5c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -634,6 +634,11 @@ object MimaExcludes { // [SPARK-14628] Simplify task metrics by always tracking read/write metrics ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.readMethod"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.writeMethod") + ) ++ Seq( + // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.collection.ExternalAppendOnlyMap"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.collection.ExternalSorter"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.collection.Spillable") ) case v if v.startsWith("1.6") => Seq(