-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19659] Fetch big blocks to disk when shuffle-read. #16989
Conversation
Hi @jinxing64 I posted a comment on jira about the design -- I think this is a big enough change that its worth discussing the design first. Its fine to keep working on the code as a demonstration if you want, but for now I'd ask that you label this a work-in-progress "[WIP]". (I personally have only briefly glanced at the code and am unlikely to look more closely till we sort out the design issues.) fwiw I think this is will be a great feature, we just need to be thoughtful about it. |
Jenkins, add to whitelist |
Test build #73224 has finished for PR 16989 at commit
|
@squito |
@rxin @squito @davies @andrewor14 @JoshRosen |
Test build #75358 has finished for PR 16989 at commit
|
Test build #75435 has finished for PR 16989 at commit
|
Test build #75436 has finished for PR 16989 at commit
|
Test build #75441 has finished for PR 16989 at commit
|
Test build #75834 has finished for PR 16989 at commit
|
Jenkins, test this please |
extends MapStatus with Externalizable { | ||
|
||
// loc could be null when the default constructor is called during deserialization | ||
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, | ||
"Average size can only be zero for map stages that produced no output") | ||
|
||
protected def this() = this(null, -1, null, -1) // For deserialization only | ||
def this() = this(null, -1, null, -1, null) // For deserialization only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the protected
and make this visible for test.
Test build #75855 has finished for PR 16989 at commit
|
Test build #75853 has finished for PR 16989 at commit
|
Test build #75858 has finished for PR 16989 at commit
|
Test build #75883 has finished for PR 16989 at commit
|
Test build #75935 has finished for PR 16989 at commit
|
Test build #75938 has finished for PR 16989 at commit
|
@@ -100,7 +114,14 @@ public void onSuccess(ByteBuffer response) { | |||
// Immediately request all chunks -- we expect that the total size of the request is | |||
// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. | |||
for (int i = 0; i < streamHandle.numChunks; i++) { | |||
client.fetchChunk(streamHandle.streamId, i, chunkCallback); | |||
if (fetchToDisk) { | |||
final File targetFile = new File(".", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should use DiskBlockManager.getFile
to store data in file system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I wanted to use DiskBlockManager.getFile
, but I found it's hard to import DiskBlockManager
from OneForOneBlockFetcher
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkEnv.get.blockManager.diskBlockManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan
Yes, but OneForOneBlockFetcher
is in network-shuffle
package, I find it hard to import SparkEnv
from core
package. Did I miss something?(I'm sorry if this question is stupid.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of passing a boolean fetchToDisk
, shall we ask the caller to pass in Option<File> file
? My concern is that, Spark has a rule about where to write temp files, we can't just write it to the current directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan
I understood ~
I will refine, I will replace boolean fetchToDisk
with Option<File[]> shuffleFilesOpt
.
} | ||
|
||
@Override | ||
public void onFailure(String streamId, Throwable cause) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we remove the partial written file when failing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that will be good !
@@ -133,36 +135,53 @@ private[spark] class HighlyCompressedMapStatus private ( | |||
private[this] var loc: BlockManagerId, | |||
private[this] var numNonEmptyBlocks: Int, | |||
private[this] var emptyBlocks: RoaringBitmap, | |||
private[this] var avgSize: Long) | |||
private[this] var avgSize: Long, | |||
private[this] var hugeBlockSizes: HashMap[Int, Byte]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add parameter doc for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will refine.
@@ -388,7 +388,8 @@ private[serializer] object KryoSerializer { | |||
classOf[Array[Short]], | |||
classOf[Array[Long]], | |||
classOf[BoundedPriorityQueue[_]], | |||
classOf[SparkConf] | |||
classOf[SparkConf], | |||
classOf[HashMap[Int, Byte]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is the overhead when serializing hash map with kryo?
@@ -42,6 +46,12 @@ private[spark] class BlockStoreShuffleReader[K, C]( | |||
|
|||
/** Read the combined key-values for this reduce task */ | |||
override def read(): Iterator[Product2[K, C]] = { | |||
val memMode = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this into ShuffleBlockFetcherIterator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ideally this should be moved into ShuffleBlockFetcherIterator
, but I didn't find a better implementation other than
extends MemoryConsumer(tmm, tmm.pageSizeBytes(),
if (SparkTransportConf.fromSparkConf(SparkEnv.get.conf, "shuffle").preferDirectBufs()) {
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
)
I think above is not good looking.
And I'd be a little bit hesitant to expose a 'setMode' in MemoryConsumer
Test build #77316 has finished for PR 16989 at commit
|
Test build #77319 has finished for PR 16989 at commit
|
Test build #77321 has finished for PR 16989 at commit
|
|
||
val diskBlockManager = mock(classOf[DiskBlockManager]) | ||
doReturn{ | ||
var blockId = new TempLocalBlockId(UUID.randomUUID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. sorry for nit ...
good job! merging to master/2.2! |
## What changes were proposed in this pull request? Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse. Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming. It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM. In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019): 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Request memory from `MemoryManager` before fetch blocks and release the memory to `MemoryManager` when `ManagedBuffer` is released. 3. Fetch remote blocks to disk when failing acquiring memory from `MemoryManager`, otherwise fetch to memory. This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below: 1. Single huge block; 2. Sizes of many blocks are underestimated in `MapStatus` and the actual footprint of blocks is much larger than the estimated. ## How was this patch tested? Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`. Author: jinxing <jinxing6042@126.com> Closes #16989 from jinxing64/SPARK-19659. (cherry picked from commit 3f94e64) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan @JoshRosen @mridulm @squito @viirya |
} | ||
|
||
// Shuffle remote blocks to disk when the request is too large. | ||
// TODO: Encryption and compression should be considered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you expand on what the TODO here is? I want to make sure this doesn't slip through the cracks and become forgotten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm just going to send a follow-up PR. Ideally all local files written by Spark could be encrypted and compressed according to config. One example is UnsafeSorterSpillWriter
, it writes data with DiskBlockObjectWriter
, which calls SerializerManager.wrapStream
and handles encryption and compression automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't really followed this review (sorry), but shuffle data is transmitted encrypted and compressed over the wire, so there might be a chance that there's nothing to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah that's a good point! Yea we don't need to encrypt and compress the data again here. I'll update this comment.
One question: do we need to encrypt and compress the data for sort buffer spill and aggregate buffer spill? cc @JoshRosen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to encrypt and compress the data for sort buffer spill and aggregate buffer spill?
Yes, but I thought I had done that in a previous change. Maybe I missed something.
…read ## What changes were proposed in this pull request? This PR includes some minor improvement for the comments and tests in #16989 ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #18117 from cloud-fan/follow. (cherry picked from commit 1d62f8a) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…read ## What changes were proposed in this pull request? This PR includes some minor improvement for the comments and tests in apache#16989 ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes apache#18117 from cloud-fan/follow.
…uffle. In current code(#16989), big blocks are shuffled to disk. This pr proposes to collect metrics for remote bytes fetched to disk. Author: jinxing <jinxing6042@126.com> Closes #18249 from jinxing64/SPARK-19937.
@@ -95,6 +97,25 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) { | |||
} | |||
|
|||
@Override | |||
public ManagedBuffer openStream(String streamChunkId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinxing64 this breaks old shuffle service. We should avoid change server side codes. Right now I just disabled this feature in #18467
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will try make a pr as soon as possible.
…uffle. In current code(apache#16989), big blocks are shuffled to disk. This pr proposes to collect metrics for remote bytes fetched to disk. Author: jinxing <jinxing6042@126.com> Closes apache#18249 from jinxing64/SPARK-19937.
|
||
public DownloadCallback(File targetFile, int chunkIndex) throws IOException { | ||
this.targetFile = targetFile; | ||
this.channel = Channels.newChannel(new FileOutputStream(targetFile)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work with RetryingBlockFetcher
? Let's say we have 2 chunks: "chunk 1", "chunk 2". If "chunk 1" fails, it will fail "chunk 2" as well. However, DownloadCallbacks for "chunk 2" are still running. In this case, RetryingBlockFetcher will retry "chunk 2" as well. Hence, there will be 2 DownloadCallbacks writing to the same file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible fix is writing to a temp file and renaming it to the target file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I will make a pr today for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing @cloud-fan
OneForOneBlockFetcher
"open blocks" asynchronously. If I understand correctly, the retry of the start()
in OneForOneBlockFetcher
is only triggered when failure of sending OpenBlocks
, but failure of fetching chunk cannot trigger the retry in RetryingBlockFetcher
. DownloadCalback
is not initialized when the failure of "open blocks" happens. So there cannot be two DownloadCallbacks
for same stream working at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinxing64 The retry logic is here:
Line 215 in 88a536b
if (shouldRetry(exception)) { |
The issue is there will be two DownloadCallback
s download the same content to the same target file. While the first one finishes, ShuffleBlockFetcherIterator may start to read it, however, the second DownloadCallback
may be still running and writing to the target file. It could cause ShuffleBlockFetcherIterator
reading a partial result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon, I could hardly believe there are two DownloadCallback
s download the same content to the same target file. In my understanding:
- When
RetryingBlockFetcher
retry, there is noDownloadCallback
initialized; - When failure of fetching chunk, retry from
RetryingBlockFetcher
will not be triggered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing
Sorry, I just realized this issue. There can be conflict between two DownloadCallback
s. I will figure out a way to resolve this.
|
||
@Override | ||
public void onData(String streamId, ByteBuffer buf) throws IOException { | ||
channel.write(buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am super-late on reviewing this, apologies, just asking questions for my own understanding, and to consider possible future improvements -- this won't do a zero-copy transfer, will it? That ByteBuffer is still in user space?
From my understanding, we'd need to do special handling to use netty's spliceTo
when possible:
https://stackoverflow.com/questions/30322957/is-there-transferfrom-like-functionality-in-netty-for-zero-copy
but I'm still working on putting all the pieces together here and admittedly this is out of my area of expertise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito This is a Java Channel. Not sure how to call io.netty.channel.epoll.AbstractEpollStreamChannel.spliceTo
here.
By the way, I think this is a zero-copy transfer since the underlying buffer is an off heap buffer.
Anyway, I found a bug here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I realize there isn't a simple one-line change here to switch to using spliceTo, I was wondering what the behavior is.
I actually thought zero-copy and offheap were orthogonal -- anytime netty gives you direct access to bytes, it has to be copied to user space, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito You are right. It needs a copy between user space and kernel space.
## What changes were proposed in this pull request? This is a followup of #16989 The fetch-big-block-to-disk feature is disabled by default, because it's not compatible with external shuffle service prior to Spark 2.2. The client sends stream request to fetch block chunks, and old shuffle service can't support it. After 2 years, Spark 2.2 has EOL, and now it's safe to turn on this feature by default ## How was this patch tested? existing tests Closes #23625 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? This is a followup of apache#16989 The fetch-big-block-to-disk feature is disabled by default, because it's not compatible with external shuffle service prior to Spark 2.2. The client sends stream request to fetch block chunks, and old shuffle service can't support it. After 2 years, Spark 2.2 has EOL, and now it's safe to turn on this feature by default ## How was this patch tested? existing tests Closes apache#23625 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse.
Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming.
It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM.
In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019):
MemoryManager
before fetch blocks and release the memory toMemoryManager
whenManagedBuffer
is released.MemoryManager
, otherwise fetch to memory.This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below:
MapStatus
and the actual footprint of blocks is much larger than the estimated.How was this patch tested?
Added unit test in
MapStatusSuite
andShuffleBlockFetcherIteratorSuite
.