-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1594] improvement(client):support generating larger block size during shuffle map task by spill partial partitions data #1670
Conversation
… partial partitions data
… partial partitions data
# Conflicts: # client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
# Conflicts: # client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
… partial partitions data
… partial partitions data
@rickyma Could you help me review this pull request? |
Could you paste some test results to the community for this feature? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some minor comments.
List<Integer> partitionList = | ||
new ArrayList<Integer>() { | ||
{ | ||
addAll(buffers.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified as:
List<Integer> partitionList = new ArrayList<>(buffers.keySet());
} | ||
}; | ||
if (bufferSpillRatio < 1.0) { | ||
Collections.sort( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified as:
partitionList.sort(Comparator.comparingInt(o -> buffers.get(o) == null ? 0 : buffers.get(o).getMemoryUsed()).reversed());
@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() { | |||
+ dataSize | |||
+ "], memoryUsed[" | |||
+ memoryUsed | |||
+ "],number of blocks[" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, add a space here, for a better log output:
],number of blocks[
-> ], number of blocks[
@@ -316,6 +350,10 @@ public synchronized List<ShuffleBlockInfo> clear() { | |||
+ dataSize | |||
+ "], memoryUsed[" | |||
+ memoryUsed | |||
+ "],number of blocks[" | |||
+ result.size() | |||
+ "],flush ratio[" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, add a space here, for a better log output:
],flush ratio[
-> ], flush ratio[
LOG.info( | ||
String.format( | ||
"ShuffleBufferManager spill for buffer size exceeding spill threshold," | ||
+ "usedBytes[%d],inSendListBytes[%d],spill size threshold[%d]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, a better log output:
" usedBytes[%d], inSendListBytes[%d], spill size threshold[%d]",
… partial partitions data
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
Outdated
Show resolved
Hide resolved
… partial partitions data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Although I think if you want to acheive bigger block size, maybe the temproal executor side localfile could be implemented to store the task partition shuffle data.
…1756) ### What changes were proposed in this pull request? 1. When the spill ratio is `1.0` , the process of calculating target spill size will be ignored to avoid potential race condition that the `usedBytes` and `inSendBytes` are not thread safe. This could guarantee that the all data is flushed to the shuffle server at the end of task. 2. Adding the `bufferManager's` buffer remaining check ### Why are the changes needed? Due to the #1670 , the partial data held by the bufferManager will not be flushed to shuffle servers in some corner cases, this will make task fail fast rather than silently data loss that should thanks the #1558 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests.
…umber (apache#1756) ### What changes were proposed in this pull request? 1. When the spill ratio is `1.0` , the process of calculating target spill size will be ignored to avoid potential race condition that the `usedBytes` and `inSendBytes` are not thread safe. This could guarantee that the all data is flushed to the shuffle server at the end of task. 2. Adding the `bufferManager's` buffer remaining check ### Why are the changes needed? Due to the apache#1670 , the partial data held by the bufferManager will not be flushed to shuffle servers in some corner cases, this will make task fail fast rather than silently data loss that should thanks the apache#1558 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests.
What changes were proposed in this pull request?
when spilling shuffle data, we just spill part of the reduce partition datas which hold the major space.
so, in each spilling process, the WriteBufferManager.clear() method should implement one more logic: sort the to-be spilled buffers by their size and select the top-N buffers to spill.
Why are the changes needed?
related feature #1594
Does this PR introduce any user-facing change?
No.
How was this patch tested?
new UTs.