Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3577] Report Spill size on disk for UnsafeExternalSorter #17471

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final BlockManager blockManager;
private final SerializerManager serializerManager;
private final TaskContext taskContext;
private ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
Expand Down Expand Up @@ -144,10 +143,6 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024
this.fileBufferSizeBytes = 32 * 1024;
// The spill metrics are stored in a new ShuffleWriteMetrics,
// and then discarded (this fixes SPARK-16827).
// TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577).
this.writeMetrics = new ShuffleWriteMetrics();

if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
Expand Down Expand Up @@ -199,6 +194,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
spillWriters.size(),
spillWriters.size() > 1 ? " times" : " time");

ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// We only write out contents of the inMemSorter if it is not empty.
if (inMemSorter.numRecords() > 0) {
final UnsafeSorterSpillWriter spillWriter =
Expand Down Expand Up @@ -226,6 +222,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
// pages, we might not be able to get memory for the pointer array.

taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
totalSpillBytes += spillSize;
return spillSize;
}
Expand Down Expand Up @@ -502,6 +499,7 @@ public long spill() throws IOException {
UnsafeInMemorySorter.SortedIterator inMemIterator =
((UnsafeInMemorySorter.SortedIterator) upstream).clone();

ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
Expand Down Expand Up @@ -540,6 +538,7 @@ public long spill() throws IOException {
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we create a new writeMetrics here, instead of report this.writeMetrics.bytesWritten?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're spilling, so bytes written should be counted towards spill rather than write.
Approach similar to - https://github.com/sitalkedia/spark/blob/master/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java#L138

totalSpillBytes += released;
return released;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,31 @@ public void forcedSpillingWithoutComparator() throws Exception {
assertSpillFilesWereCleanedUp();
}

@Test
public void testDiskSpilledBytes() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
long[] record = new long[100];
int recordSize = record.length * 8;
int n = (int) pageSizeBytes / recordSize * 3;
for (int i = 0; i < n; i++) {
record[0] = (long) i;
sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false);
}
// We will have at-least 2 memory pages allocated because of rounding happening due to
// integer division of pageSizeBytes and recordSize.
assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
assertTrue(taskContext.taskMetrics().diskBytesSpilled() == 0);
UnsafeExternalSorter.SpillableIterator iter =
(UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
assertTrue(iter.spill() > 0);
assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
assertEquals(0, iter.spill());
// Even if we did not spill second time, the disk spilled bytes should still be non-zero
assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}

@Test
public void testPeakMemoryUsed() throws Exception {
final long recordLengthBytes = 8;
Expand Down