Skip to content

Commit

Permalink
Avoid many small spills
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Nov 19, 2014
1 parent aa9ebda commit 7012595
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}

private object ShuffleMemoryManager {
private[spark] object ShuffleMemoryManager {
/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than
Expand All @@ -122,4 +122,7 @@ private object ShuffleMemoryManager {
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

// Initial threshold for the size of a collection before we start tracking its memory usage
val INITIAL_MEMORY_TRACKING_THRESHOLD: Long = 5 * 1024 * 1024
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import com.google.common.io.ByteStreams

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
import org.apache.spark.executor.ShuffleWriteMetrics

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -81,8 +82,9 @@ class ExternalAppendOnlyMap[K, V, C](
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000

// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L
// Threshold for the collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
private var myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD

/**
* Size of object batches when reading/writing from serializers.
Expand Down Expand Up @@ -235,8 +237,12 @@ class ExternalAppendOnlyMap[K, V, C](
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))

// Release our memory back to the shuffle pool so that other threads can grab it
shuffleMemoryManager.release(myMemoryThreshold)
myMemoryThreshold = 0L
// The amount we requested does not include the initial memory tracking threshold
shuffleMemoryManager.release(
myMemoryThreshold - ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD)

// Reset this to the initial threshold to avoid spilling many small files
myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD

elementsRead = 0
_memoryBytesSpilled += mapSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark._
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.storage.{BlockObjectWriter, BlockId}

/**
Expand Down Expand Up @@ -134,8 +135,9 @@ private[spark] class ExternalSorter[K, V, C](
// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _

// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L
// Threshold for the collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
private var myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD

// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
// local aggregation and sorting, write numPartitions files directly and just concatenate them
Expand Down Expand Up @@ -284,8 +286,12 @@ private[spark] class ExternalSorter[K, V, C](
}

// Release our memory back to the shuffle pool so that other threads can grab it
shuffleMemoryManager.release(myMemoryThreshold)
myMemoryThreshold = 0
// The amount we requested does not include the initial memory tracking threshold
shuffleMemoryManager.release(
myMemoryThreshold - ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD)

// Reset this to the initial threshold to avoid spilling many small files
myMemoryThreshold = ShuffleMemoryManager.INITIAL_MEMORY_TRACKING_THRESHOLD

_memoryBytesSpilled += memorySize
elementsRead = 0
Expand Down

0 comments on commit 7012595

Please sign in to comment.