Skip to content

Commit

Permalink
[SPARK-4480] Avoid many small spills in external data structures
Browse files Browse the repository at this point in the history
**Summary.** Currently, we may spill many small files in `ExternalAppendOnlyMap` and `ExternalSorter`. The underlying root cause of this is summarized in [SPARK-4452](https://issues.apache.org/jira/browse/SPARK-4452). This PR does not address this root cause, but simply provides the guarantee that we never spill the in-memory data structure if its size is less than a configurable threshold of 5MB. This config is not documented because we don't want users to set it themselves, and it is not hard-coded because we need to change it in tests.

**Symptom.** Each spill is orders of magnitude smaller than 1MB, and there are many spills. In environments where the ulimit is set, this frequently causes "too many open file" exceptions observed in [SPARK-3633](https://issues.apache.org/jira/browse/SPARK-3633).
```
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292769 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4760 B to disk (292770 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4520 B to disk (292771 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4560 B to disk (292772 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4792 B to disk (292773 spills so far)
14/11/13 19:20:43 INFO collection.ExternalSorter: Thread 60 spilling in-memory batch of 4784 B to disk (292774 spills so far)
```

**Reproduction.** I ran the following on a small 4-node cluster with 512MB executors. Note that the back-to-back shuffle here is necessary for reasons described in [SPARK-4522](https://issues.apache.org/jira/browse/SPARK-4452). The second shuffle is a `reduceByKey` because it performs a map-side combine.
```
sc.parallelize(1 to 100000000, 100)
  .map { i => (i, i) }
  .groupByKey()
  .reduceByKey(_ ++ _)
  .count()
```
Before the change, I notice that each thread may spill up to 1000 times, and the size of each spill is on the order of 10KB. After the change, each thread spills only up to 20 times in the worst case, and the size of each spill is on the order of 1MB.

Author: Andrew Or <andrew@databricks.com>

Closes #3353 from andrewor14/avoid-small-spills and squashes the following commits:

49f380f [Andrew Or] Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark into avoid-small-spills
27d6966 [Andrew Or] Merge branch 'master' of github.com:apache/spark into avoid-small-spills
f4736e3 [Andrew Or] Fix tests
a919776 [Andrew Or] Avoid many small spills

(cherry picked from commit 0eb4a7f)
Signed-off-by: Andrew Or <andrew@databricks.com>
  • Loading branch information
Andrew Or committed Nov 20, 2014
1 parent f21e550 commit 4a5c3d2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import org.apache.spark.SparkEnv
* Spills contents of an in-memory collection to disk when the memory threshold
* has been exceeded.
*/
private[spark] trait Spillable[C] {

this: Logging =>

private[spark] trait Spillable[C] extends Logging {
/**
* Spills the current in-memory collection to disk, and releases the memory.
*
Expand All @@ -45,15 +42,21 @@ private[spark] trait Spillable[C] {
// Memory manager that can be used to acquire/release memory
private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager

// What threshold of elementsRead we start estimating collection size at
// Threshold for `elementsRead` before we start tracking this collection's memory usage
private[this] val trackMemoryThreshold = 1000

// Initial threshold for the size of a collection before we start tracking its memory usage
// Exposed for testing
private[this] val initialMemoryThreshold: Long =
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)

// Threshold for this collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
private[this] var myMemoryThreshold = initialMemoryThreshold

// Number of elements read from input since last spill
private[this] var _elementsRead = 0L

// How much of the shared memory pool this collection has claimed
private[this] var myMemoryThreshold = 0L

// Number of bytes spilled in total
private[this] var _memoryBytesSpilled = 0L

Expand Down Expand Up @@ -102,8 +105,9 @@ private[spark] trait Spillable[C] {
* Release our memory back to the shuffle pool so that other threads can grab it.
*/
private def releaseMemoryForThisThread(): Unit = {
shuffleMemoryManager.release(myMemoryThreshold)
myMemoryThreshold = 0L
// The amount we requested does not include the initial memory tracking threshold
shuffleMemoryManager.release(myMemoryThreshold - initialMemoryThreshold)
myMemoryThreshold = initialMemoryThreshold
}

/**
Expand All @@ -114,7 +118,7 @@ private[spark] trait Spillable[C] {
@inline private def logSpillage(size: Long) {
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
_spillCount, if (_spillCount > 1) "s" else ""))
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
_spillCount, if (_spillCount > 1) "s" else ""))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)

Expand All @@ -152,6 +153,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
test("empty partitions with spilling, bypass merge-sort") {
val conf = createSparkConf(false)
conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)

Expand Down

0 comments on commit 4a5c3d2

Please sign in to comment.