diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d29b2ff58d404..e193ed222e228 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -282,18 +282,9 @@ package object config { private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD = ConfigBuilder("spark.shuffle.accurateBlockThreshold") .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + - "record the size accurately if it's above this config and " + - "spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to prevent" + - " OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.") + "record the size accurately if it's above this config. This helps to prevent OOM by " + + "avoiding underestimating shuffle block size when fetch shuffle blocks.") .bytesConf(ByteUnit.BYTE) .createWithDefault(100 * 1024 * 1024) - private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE = - ConfigBuilder("spark.shuffle.accurateBlockThresholdByTimesAverage") - .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " + - "record the size accurately if it's above this config * averageSize and " + - "spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding " + - "underestimating shuffle block size when fetch shuffle blocks.") - .intConf - .createWithDefault(2) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 08ebbeca988e3..b93f2232a9d86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -127,14 +127,13 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger - * than both spark.shuffle.accurateBlockThreshold and - * spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. It stores the - * average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty. + * than both spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty + * blocks, plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty - * @param avgSize average size of the non-empty blocks + * @param avgSize average size of the non-empty and non-huge blocks * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private ( extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization - require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, + require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") protected def this() = this(null, -1, null, -1, null) // For deserialization only @@ -204,11 +203,21 @@ private[spark] object HighlyCompressedMapStatus { // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. val emptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length + val threshold = Option(SparkEnv.get) + .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) + .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) + val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]() while (i < totalNumBlocks) { - var size = uncompressedSizes(i) + val size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 - totalSize += size + // Remove the huge blocks from the calculation for average size and have accurate size for + // smaller blocks. + if (size < threshold) { + totalSize += size + } else { + hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i))) + } } else { emptyBlocks.add(i) } @@ -219,24 +228,6 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - val threshold1 = Option(SparkEnv.get) - .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) - .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) - val threshold2 = avgSize * Option(SparkEnv.get) - .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE)) - .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.defaultValue.get) - val threshold = math.max(threshold1, threshold2) - val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]() - if (numNonEmptyBlocks > 0) { - i = 0 - while (i < totalNumBlocks) { - if (uncompressedSizes(i) > threshold) { - hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i))) - - } - i += 1 - } - } emptyBlocks.trim() emptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index c940f7d88f834..3ec37f674c77b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -133,37 +133,9 @@ class MapStatusSuite extends SparkFunSuite { assert(!success) } - test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD is 0, blocks which are bigger than " + - "SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE * averageSize should not be " + + test("Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be " + "underestimated.") { - val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "0") - .set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "2") - val env = mock(classOf[SparkEnv]) - doReturn(conf).when(env).conf - SparkEnv.set(env) - // Value of element in sizes is equal to the corresponding index when index >= 1000. - val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray) - val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) - val arrayStream = new ByteArrayOutputStream(102400) - val objectOutputStream = new ObjectOutputStream(arrayStream) - assert(status1.isInstanceOf[HighlyCompressedMapStatus]) - objectOutputStream.writeObject(status1) - objectOutputStream.flush() - val array = arrayStream.toByteArray - val objectInput = new ObjectInputStream(new ByteArrayInputStream(array)) - val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus] - val avg = sizes.sum / 2001 - ((2 * avg + 1) to 2000).foreach { - case part => - assert(status2.getSizeForBlock(part.toInt) >= sizes(part.toInt)) - } - } - - test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE is 0, blocks which are bigger than" + - " SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.") - { val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000") - .set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "0") val env = mock(classOf[SparkEnv]) doReturn(conf).when(env).conf SparkEnv.set(env) diff --git a/docs/configuration.md b/docs/configuration.md index 936de36456806..a6b6d5dfa5f95 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -617,21 +617,10 @@ Apart from these, the following properties are also available, and may be useful
spark.shuffle.accurateBlockThresholdByTimesAverage
* averageSize. This helps to
- prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.
- spark.shuffle.accurateBlockThresholdByTimesAverage
spark.shuffle.accurateBlockThreshold
. This helps to prevent OOM by avoiding
+ size accurately if it's above this config. This helps to prevent OOM by avoiding
underestimating shuffle block size when fetch shuffle blocks.
spark.io.encryption.enabled