From 672448220309751a8d2f38a13a09d9ccf5d5963a Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 10 May 2016 11:07:11 +0800 Subject: [PATCH] init commit --- .../apache/spark/memory/MemoryConsumer.java | 19 ++- .../spark/memory/TaskMemoryManager.java | 7 + .../collection/ExternalAppendOnlyMap.scala | 111 +++++++++++-- .../util/collection/ExternalSorter.scala | 150 +++++++++++++++--- .../spark/util/collection/Spillable.scala | 48 ++++-- .../ExternalAppendOnlyMapSuite.scala | 14 ++ .../util/collection/ExternalSorterSuite.scala | 17 ++ project/MimaExcludes.scala | 3 + 8 files changed, 323 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 36138cc9a297c..840f13b39464c 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -45,7 +45,7 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager) { /** * Returns the size of used memory in bytes. */ - long getUsed() { + protected long getUsed() { return used; } @@ -130,4 +130,21 @@ protected void freePage(MemoryBlock page) { used -= page.size(); taskMemoryManager.freePage(page, this); } + + /** + * Allocates a heap memory of `size`. + */ + public long acquireOnHeapMemory(long size) { + long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this); + used += granted; + return granted; + } + + /** + * Release N bytes of heap memory. + */ + public void freeOnHeapMemory(long size) { + taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this); + used -= size; + } } diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index a0edf262865a0..8f05824d92b3c 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -408,4 +408,11 @@ public long cleanUpAllAllocatedMemory() { public long getMemoryConsumptionForThisTask() { return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId); } + + /** + * Returns Tungsten memory mode + */ + public MemoryMode getTungstenMemoryMode(){ + return tungstenMemoryMode; + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f6d81ee5bf05e..92c99c92666ac 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -59,10 +59,10 @@ class ExternalAppendOnlyMap[K, V, C]( serializer: Serializer = SparkEnv.get.serializer, blockManager: BlockManager = SparkEnv.get.blockManager, context: TaskContext = TaskContext.get()) - extends Iterable[(K, C)] + extends Spillable[SizeTracker](context.taskMemoryManager()) with Serializable with Logging - with Spillable[SizeTracker] { + with Iterable[(K, C)] { if (context == null) { throw new IllegalStateException( @@ -79,9 +79,7 @@ class ExternalAppendOnlyMap[K, V, C]( this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get()) } - override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager() - - private var currentMap = new SizeTrackingAppendOnlyMap[K, C] + @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C] private val spilledMaps = new ArrayBuffer[DiskMapIterator] private val sparkConf = SparkEnv.get.conf private val diskBlockManager = blockManager.diskBlockManager @@ -115,6 +113,8 @@ class ExternalAppendOnlyMap[K, V, C]( private val keyComparator = new HashComparator[K] private val ser = serializer.newInstance() + @volatile private var readingIterator: SpillableIterator = null + /** * Number of files this map has spilled so far. * Exposed for testing. @@ -180,6 +180,29 @@ class ExternalAppendOnlyMap[K, V, C]( * Sort the existing contents of the in-memory map and spill them to a temporary file on disk. */ override protected[this] def spill(collection: SizeTracker): Unit = { + val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator) + val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator) + spilledMaps.append(diskMapIterator) + } + + /** + * Force to spilling the current in-memory collection to disk to release memory, + * It will be called by TaskMemoryManager when there is not enough memory for the task. + */ + override protected[this] def forceSpill(): Boolean = { + assert(readingIterator != null) + val isSpilled = readingIterator.spill() + if (isSpilled) { + currentMap = null + } + isSpilled + } + + /** + * Spill the in-memory Iterator to a temporary file on disk. + */ + private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)]) + : DiskMapIterator = { val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -200,9 +223,8 @@ class ExternalAppendOnlyMap[K, V, C]( var success = false try { - val it = currentMap.destructiveSortedIterator(keyComparator) - while (it.hasNext) { - val kv = it.next() + while (inMemoryIterator.hasNext) { + val kv = inMemoryIterator.next() writer.write(kv._1, kv._2) objectsWritten += 1 @@ -235,7 +257,17 @@ class ExternalAppendOnlyMap[K, V, C]( } } - spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) + new DiskMapIterator(file, blockId, batchSizes) + } + + /** + * Returns a destructive iterator for iterating over the entries of this map. + * If this iterator is forced spill to disk to release memory when there is not enough memory, + * it returns pairs from an on-disk map. + */ + def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { + readingIterator = new SpillableIterator(inMemoryIterator) + readingIterator } /** @@ -248,15 +280,18 @@ class ExternalAppendOnlyMap[K, V, C]( "ExternalAppendOnlyMap.iterator is destructive and should only be called once.") } if (spilledMaps.isEmpty) { - CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap()) + CompletionIterator[(K, C), Iterator[(K, C)]]( + destructiveIterator(currentMap.iterator), freeCurrentMap()) } else { new ExternalIterator() } } private def freeCurrentMap(): Unit = { - currentMap = null // So that the memory can be garbage-collected - releaseMemory() + if (currentMap != null) { + currentMap = null // So that the memory can be garbage-collected + releaseMemory() + } } /** @@ -270,8 +305,8 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order - private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]]( - currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap()) + private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator( + currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap()) private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered) inputStreams.foreach { it => @@ -530,8 +565,56 @@ class ExternalAppendOnlyMap[K, V, C]( context.addTaskCompletionListener(context => cleanup()) } + private[this] class SpillableIterator(var upstream: Iterator[(K, C)]) + extends Iterator[(K, C)] { + + private val SPILL_LOCK = new Object() + + private var nextUpstream: Iterator[(K, C)] = null + + private var cur: (K, C) = readNext() + + private var hasSpilled: Boolean = false + + def spill(): Boolean = SPILL_LOCK.synchronized { + if (hasSpilled) { + false + } else { + logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") + nextUpstream = spillMemoryIteratorToDisk(upstream) + hasSpilled = true + true + } + } + + def readNext(): (K, C) = SPILL_LOCK.synchronized { + if (nextUpstream != null) { + upstream = nextUpstream + nextUpstream = null + } + if (upstream.hasNext) { + upstream.next() + } else { + null + } + } + + override def hasNext(): Boolean = cur != null + + override def next(): (K, C) = { + val r = cur + cur = readNext() + r + } + } + /** Convenience function to hash the given (K, C) pair by the key. */ private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1) + + override def toString(): String = { + this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode()) + } } private[spark] object ExternalAppendOnlyMap { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 44b1d90667e65..b38651a1b5c46 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -92,10 +92,8 @@ private[spark] class ExternalSorter[K, V, C]( partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, serializer: Option[Serializer] = None) - extends Logging - with Spillable[WritablePartitionedPairCollection[K, C]] { - - override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager() + extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager()) + with Logging { private val conf = SparkEnv.get.conf @@ -125,8 +123,8 @@ private[spark] class ExternalSorter[K, V, C]( // Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. - private var map = new PartitionedAppendOnlyMap[K, C] - private var buffer = new PartitionedPairBuffer[K, C] + @volatile private var map = new PartitionedAppendOnlyMap[K, C] + @volatile private var buffer = new PartitionedPairBuffer[K, C] // Total spilling statistics private var _diskBytesSpilled = 0L @@ -136,6 +134,10 @@ private[spark] class ExternalSorter[K, V, C]( private var _peakMemoryUsedBytes: Long = 0L def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes + @volatile private var isShuffleSort: Boolean = true + private val forceSpillFiles = new ArrayBuffer[SpilledFile] + @volatile private var readingIterator: SpillableIterator = null + // A comparator for keys K that orders them within a partition to allow aggregation or sorting. // Can be a partial ordering by hash code if a total ordering is not provided through by the // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some @@ -234,6 +236,34 @@ private[spark] class ExternalSorter[K, V, C]( * @param collection whichever collection we're using (map or buffer) */ override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { + val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) + val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) + spills.append(spillFile) + } + + /** + * Force to spilling the current in-memory collection to disk to release memory, + * It will be called by TaskMemoryManager when there is not enough memory for the task. + */ + override protected[this] def forceSpill(): Boolean = { + if (isShuffleSort) { + false + } else { + assert(readingIterator != null) + val isSpilled = readingIterator.spill() + if (isSpilled) { + map = null + buffer = null + } + isSpilled + } + } + + /** + * Spill contents of in-memory iterator to a temporary file on disk. + */ + private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) + : SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. @@ -270,12 +300,11 @@ private[spark] class ExternalSorter[K, V, C]( var success = false try { - val it = collection.destructiveSortedWritablePartitionedIterator(comparator) - while (it.hasNext) { - val partitionId = it.nextPartition() + while (inMemoryIterator.hasNext) { + val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") - it.writeNext(writer) + inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 @@ -307,7 +336,7 @@ private[spark] class ExternalSorter[K, V, C]( } } - spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) + SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition) } /** @@ -598,6 +627,20 @@ private[spark] class ExternalSorter[K, V, C]( } } + /** + * Returns a destructive iterator for iterating over the entries of this map. + * If this iterator is forced spill to disk to release memory when there is not enough memory, + * it returns pairs from an on-disk map. + */ + def destructiveIterator(memoryIterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = { + if (isShuffleSort) { + memoryIterator + } else { + readingIterator = new SpillableIterator(memoryIterator) + readingIterator + } + } + /** * Return an iterator over all the data written to this object, grouped by partition and * aggregated by the requested aggregator. For each partition we then have an iterator over its @@ -617,21 +660,26 @@ private[spark] class ExternalSorter[K, V, C]( // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key - groupByPartition(collection.partitionedDestructiveSortedIterator(None)) + groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) } else { // We do need to sort by both partition ID and key - groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator))) + groupByPartition(destructiveIterator( + collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data - merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) + merge(spills, destructiveIterator( + collection.partitionedDestructiveSortedIterator(comparator))) } } /** * Return an iterator over all the data written to this object, aggregated by our aggregator. */ - def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) + def iterator: Iterator[Product2[K, C]] = { + isShuffleSort = false + partitionedIterator.flatMap(pair => pair._2) + } /** * Write all the data added into this ExternalSorter into a file in the disk store. This is @@ -687,11 +735,15 @@ private[spark] class ExternalSorter[K, V, C]( } def stop(): Unit = { - map = null // So that the memory can be garbage-collected - buffer = null // So that the memory can be garbage-collected spills.foreach(s => s.file.delete()) spills.clear() - releaseMemory() + forceSpillFiles.foreach(s => s.file.delete()) + forceSpillFiles.clear() + if (map != null || buffer != null) { + map = null // So that the memory can be garbage-collected + buffer = null // So that the memory can be garbage-collected + releaseMemory() + } } /** @@ -725,4 +777,66 @@ private[spark] class ExternalSorter[K, V, C]( (elem._1._2, elem._2) } } + + private[this] class SpillableIterator(var upstream: Iterator[((Int, K), C)]) + extends Iterator[((Int, K), C)] { + + private val SPILL_LOCK = new Object() + + private var nextUpstream: Iterator[((Int, K), C)] = null + + private var cur: ((Int, K), C) = readNext() + + private var hasSpilled: Boolean = false + + def spill(): Boolean = SPILL_LOCK.synchronized { + if (hasSpilled) { + false + } else { + val inMemoryIterator = new WritablePartitionedIterator { + private[this] var cur = if (upstream.hasNext) upstream.next() else null + + def writeNext(writer: DiskBlockObjectWriter): Unit = { + writer.write(cur._1._2, cur._2) + cur = if (upstream.hasNext) upstream.next() else null + } + + def hasNext(): Boolean = cur != null + + def nextPartition(): Int = cur._1._1 + } + logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + + s" it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") + val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) + forceSpillFiles.append(spillFile) + val spillReader = new SpillReader(spillFile) + nextUpstream = (0 until numPartitions).iterator.flatMap { p => + val iterator = spillReader.readNextPartition() + iterator.map(cur => ((p, cur._1), cur._2)) + } + hasSpilled = true + true + } + } + + def readNext(): ((Int, K), C) = SPILL_LOCK.synchronized { + if (nextUpstream != null) { + upstream = nextUpstream + nextUpstream = null + } + if (upstream.hasNext) { + upstream.next() + } else { + null + } + } + + override def hasNext(): Boolean = cur != null + + override def next(): ((Int, K), C) = { + val r = cur + cur = readNext() + r + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 3a48af82b1dae..6796fb99bd2dd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -17,14 +17,15 @@ package org.apache.spark.util.collection -import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} +import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager} import org.apache.spark.{Logging, SparkEnv} /** * Spills contents of an in-memory collection to disk when the memory threshold * has been exceeded. */ -private[spark] trait Spillable[C] extends Logging { +private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) + extends MemoryConsumer(taskMemoryManager) with Logging { /** * Spills the current in-memory collection to disk, and releases the memory. * @@ -32,16 +33,19 @@ private[spark] trait Spillable[C] extends Logging { */ protected def spill(collection: C): Unit + /** + * Force to spilling the current in-memory collection to disk to release memory, + * It will be called by TaskMemoryManager when there is not enough memory for the task. + */ + protected def forceSpill(): Boolean + // Number of elements read from input since last spill - protected def elementsRead: Long = _elementsRead + @volatile protected def elementsRead: Long = _elementsRead // Called by subclasses every time a record is read // It's used for checking spilling frequency protected def addElementsRead(): Unit = { _elementsRead += 1 } - // Memory manager that can be used to acquire/release memory - protected[this] def taskMemoryManager: TaskMemoryManager - // Initial threshold for the size of a collection before we start tracking its memory usage // For testing only private[this] val initialMemoryThreshold: Long = @@ -54,13 +58,13 @@ private[spark] trait Spillable[C] extends Logging { // Threshold for this collection's size in bytes before we start tracking its memory usage // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0 - private[this] var myMemoryThreshold = initialMemoryThreshold + @volatile private[this] var myMemoryThreshold = initialMemoryThreshold // Number of elements read from input since last spill private[this] var _elementsRead = 0L // Number of bytes spilled in total - private[this] var _memoryBytesSpilled = 0L + @volatile private[this] var _memoryBytesSpilled = 0L // Number of spills private[this] var _spillCount = 0 @@ -78,8 +82,7 @@ private[spark] trait Spillable[C] extends Logging { if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold - val granted = - taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null) + val granted = acquireOnHeapMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection @@ -98,6 +101,27 @@ private[spark] trait Spillable[C] extends Logging { shouldSpill } + /** + * Spill some data to disk to release memory, which will be called by TaskMemoryManager + * when there is not enough memory for the task. + */ + override def spill(size: Long, trigger: MemoryConsumer): Long = { + if (trigger != this && taskMemoryManager.getTungstenMemoryMode == MemoryMode.ON_HEAP) { + val isSpilled = forceSpill() + if (!isSpilled) { + 0L + } else { + _elementsRead = 0 + val freeMemory = myMemoryThreshold - initialMemoryThreshold + _memoryBytesSpilled += freeMemory + releaseMemory() + freeMemory + } + } else { + 0L + } + } + /** * @return number of bytes spilled in total */ @@ -107,9 +131,7 @@ private[spark] trait Spillable[C] extends Logging { * Release our memory back to the execution pool so that other tasks can grab it. */ def releaseMemory(): Unit = { - // The amount we requested does not include the initial memory tracking threshold - taskMemoryManager.releaseExecutionMemory( - myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null) + freeOnHeapMemory(myMemoryThreshold - initialMemoryThreshold) myMemoryThreshold = initialMemoryThreshold } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index dc3185a6d505a..19675b2302c98 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -418,4 +418,18 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { } } + test("force to spill for external aggregation") { + val conf = createSparkConf(loadDefaults = false) + .set("spark.shuffle.memoryFraction", "0.01") + .set("spark.memory.useLegacyMode", "true") + .set("spark.testing.memory", "100000000") + .set("spark.shuffle.sort.bypassMergeThreshold", "0") + sc = new SparkContext("local", "test", conf) + val N = 2e5.toInt + sc.parallelize(1 to N, 2) + .map { i => (i, i) } + .groupByKey() + .reduceByKey(_ ++ _) + .count() + } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index d7b2d07a40052..ab627846178fc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -610,4 +610,21 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } } } + + test("force to spill for external sorter") { + val conf = createSparkConf(loadDefaults = false, kryo = false) + .set("spark.shuffle.memoryFraction", "0.01") + .set("spark.memory.useLegacyMode", "true") + .set("spark.testing.memory", "100000000") + .set("spark.shuffle.sort.bypassMergeThreshold", "0") + sc = new SparkContext("local", "test", conf) + val N = 2e5.toInt + val p = new org.apache.spark.HashPartitioner(2) + val p2 = new org.apache.spark.HashPartitioner(3) + sc.parallelize(1 to N, 3) + .map { x => (x % 100000) -> x.toLong } + .repartitionAndSortWithinPartitions(p) + .repartitionAndSortWithinPartitions(p2) + .count() + } } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8b95909179036..6814440302efd 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -165,6 +165,9 @@ object MimaExcludes { // SPARK-12591 Register OpenHashMapBasedStateMap for Kryo ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.serializer.KryoInputDataInputBridge"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.serializer.KryoOutputDataOutputBridge") + ) ++ Seq( + // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable") ) case v if v.startsWith("1.5") => Seq(