Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold. #18031

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,22 @@ package object config {
"spark.io.compression.codec.")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config and " +
"spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to prevent" +
" OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE =
ConfigBuilder("spark.shuffle.accurateBlockThresholdByTimesAverage")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config * averageSize and " +
"spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding " +
"underestimating shuffle block size when fetch shuffle blocks.")
.intConf
.createWithDefault(2)
}
57 changes: 51 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -121,48 +126,69 @@ private[spark] class CompressedMapStatus(
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty.
* A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger
* than both spark.shuffle.accurateBlockThreshold and
* spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. It stores the
* average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
* @param hugeBlockSizes sizes of huge blocks by their reduceId.
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
private[this] var avgSize: Long,
@transient private var hugeBlockSizes: Map[Int, Byte])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I go through part of codes in #16989. It seems to me that If we want is to know which shuffle request should be go to disk instead of memory, do we need to record the mapping of block ids and accurate sizes?

A simpler approach can be adding a bitmap for hugeBlocks. And we can simply fetch those blocks into disk. Another benefit by doing this is to avoid introducing another config REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM to decide which blocks going to disk.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it makes sense to add bitmap for hugeBlocks. But I'm a little bit hesitant. I still prefer to have hugeBlockSizes more independent from upper logic. In addition, the accurate size of blocks can also have positive effect on pending requests. (e.g. spark.reducer.maxSizeInFlight can control the size of pending requests better.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control of spark.reducer.maxSizeInFlight is not a big problem. It seems to me that any blocks considered as huge should break maxSizeInFlight and can't be fetching in parallel. We actually don't need to know accurate size of huge blocks, we just need to know it's huge and it should be more than maxSizeInFlight.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya We had this discussion before in the earlier PR (which this is split from).
maxSizeInFlight meant to control how much data can be fetched in parallel and tuned based on network throughput and not memory (though currently, they are directly dependent due to implementation detail).
In reality, it is fairly small compared to what can be held in memory (48mb is default iirc) - since the memory and IO subsystems have different characteristics, using same config to control behavior in both will lead to suboptimal behavior (for example, large memory systems where large amounts can be held in memory, but network bandwidth is not propotionally higher).

extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only
protected def this() = this(null, -1, null, -1, null) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
hugeBlockSizes.get(reduceId) match {
case Some(size) => MapStatus.decompressSize(size)
case None => avgSize
}
}
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeInt(hugeBlockSizes.size)
hugeBlockSizes.foreach { kv =>
out.writeInt(kv._1)
out.writeByte(kv._2)
}
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
val count = in.readInt()
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
(0 until count).foreach { _ =>
val block = in.readInt()
val size = in.readByte()
hugeBlockSizesArray += Tuple2(block, size)
}
hugeBlockSizes = hugeBlockSizesArray.toMap
}
}

Expand Down Expand Up @@ -193,8 +219,27 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
val threshold1 = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
val threshold2 = avgSize * Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE))
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.defaultValue.get)
val threshold = math.max(threshold1, threshold2)
Copy link
Contributor

@wzhfy wzhfy May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for curiosity: is there any reason we compute threshold in this way? Is it an empirical threshold?

Copy link
Contributor

@wzhfy wzhfy May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose each map task produces a 90MB bucket and many small buckets (skew data), then avgSize can be very small, and threshold would be 100MB because 100MB (threshold1) > 2 * avgSize (threshold2). If the number of map tasks is large (several hundreads or more), OOM can still happen, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wzhfy
Thanks for taking time review this :)
This pr is based on the discussion in #16989 . The idea is to avoid underestimating big blocks in HighlyCompressedStatus and control the size of HighlyCompressedStatus at the same time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the case you mentioned above is a really good one. But setting spark.shuffle.accurateBlockThreshold means we can accept sacrificing accuracy of blocks smaller than spark.shuffle.accurateBlockThreshold. If we want it to be more accurate, set it larger(in this case we can set it 50M). Thus size of the big bucket will be accurate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but my point is these two configs are difficult for users to set. Seems we still need to adjust them case by case.

Copy link
Contributor

@wzhfy wzhfy May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I agree that with this pr, at lease we have a workaround for oom problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to avoid the OOM. To adjust the value of this config, user needs to be sophisticated. I agree that these two configs are difficult. But with the default setting, we can really avoid some OOM situations(e.g. super huge block when skew happens).

val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
if (numNonEmptyBlocks > 0) {
i = 0
while (i < totalNumBlocks) {
if (uncompressedSizes(i) > threshold) {
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))

}
i += 1
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for bringing in another while loop here. I have to calculate the average size first, then filter out the huge blocks. I don't have a better implementation to merge the two while loops into one :(

}
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this https://github.com/apache/spark/pull/16989/files#r117174623 is a good comment to have accurate size for the smaller blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Thanks a lot for taking time looking into this pr :)

remove the huge blocks from the numerator in that calculation so that you more accurately size the smaller blocks

Yes, I think this is really good idea to have accurate size for smaller blocks. But I'm proposing two configs(spark.shuffle.accurateBlockThreshold and spark.shuffle.accurateBlockThresholdByTimesAverage ) in current change, I have to compute the average twice: 1) the average calculated including huge blocks, thus I can filter out the huge blocks 2) the average calculated without huge blocks, thus I can have accurate size for the smaller blocks. A little bit complicated, right? How about remove the spark.shuffle.accurateBlockThresholdByTimesAverage ? Thus we can simplify the logic. @cloud-fan Any ideas about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current change, if almost all blocks are huge, that's said it is not a skew case, so we won't mark the blocks as huge ones. Then we will still fetch them into memory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the default value (spark.shuffle.accurateBlockThreshold=100M and spark.shuffle.accurateBlockThresholdByTimesAverage=2), Yes.
But the user can make it more strict by setting (spark.shuffle.accurateBlockThreshold=0 and spark.shuffle.accurateBlockThresholdByTimesAverage=1).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd tend to have just one flag and simplify the configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for one flag, let's only keep spark.shuffle.accurateBlockThreshold

hugeBlockSizesArray.toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.scheduler

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import scala.util.Random

import org.mockito.Mockito._
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.BlockManagerId

Expand Down Expand Up @@ -128,4 +132,54 @@ class MapStatusSuite extends SparkFunSuite {
assert(size1 === size2)
assert(!success)
}

test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD is 0, blocks which are bigger than " +
"SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE * averageSize should not be " +
"underestimated.") {
val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "0")
.set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "2")
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
// Value of element in sizes is equal to the corresponding index when index >= 1000.
val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray)
val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
val arrayStream = new ByteArrayOutputStream(102400)
val objectOutputStream = new ObjectOutputStream(arrayStream)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
objectOutputStream.writeObject(status1)
objectOutputStream.flush()
val array = arrayStream.toByteArray
val objectInput = new ObjectInputStream(new ByteArrayInputStream(array))
val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]
val avg = sizes.sum / 2001
((2 * avg + 1) to 2000).foreach {
case part =>
assert(status2.getSizeForBlock(part.toInt) >= sizes(part.toInt))
}
}

test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE is 0, blocks which are bigger than" +
" SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.")
{
val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000")
.set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "0")
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
// Value of element in sizes is equal to the corresponding index.
val sizes = (0L to 2000L).toArray
val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
val arrayStream = new ByteArrayOutputStream(102400)
val objectOutputStream = new ObjectOutputStream(arrayStream)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
objectOutputStream.writeObject(status1)
objectOutputStream.flush()
val array = arrayStream.toByteArray
val objectInput = new ObjectInputStream(new ByteArrayInputStream(array))
val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]
(1001 to 2000).foreach {
case part => assert(status2.getSizeForBlock(part) >= sizes(part))
}
}
}
20 changes: 20 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,26 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.io.compression.codec</code>.
</td>
</tr>
<tr>
<td><code>spark.shuffle.accurateBlockThreshold</code></td>
<td>100 * 1024 * 1024</td>
<td>
When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the
size accurately if it's above this config and
<code>spark.shuffle.accurateBlockThresholdByTimesAverage</code> * averageSize. This helps to
prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.
</td>
</tr>
<tr>
<td><code>spark.shuffle.accurateBlockThresholdByTimesAverage</code></td>
<td>2</td>
<td>
When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the
size accurately if it's above this config * averageSize and
<code>spark.shuffle.accurateBlockThreshold</code>. This helps to prevent OOM by avoiding
underestimating shuffle block size when fetch shuffle blocks.
</td>
<tr>
<tr>
<td><code>spark.io.encryption.enabled</code></td>
<td>false</td>
Expand Down