Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11805] free the array in UnsafeExternalSorter during spilling #9793

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ public long spill() throws IOException {
}
allocatedPages.clear();
}

// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
return released;
}
}
Expand All @@ -489,10 +495,6 @@ public void loadNext() throws IOException {
}
upstream = nextUpstream;
nextUpstream = null;

assert(inMemSorter != null);
inMemSorter.free();
inMemSorter = null;
}
numRecords--;
upstream.loadNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public UnsafeInMemorySorter(
*/
public void free() {
consumer.freeArray(array);
array = null;
}

public void reset() {
Expand Down Expand Up @@ -160,28 +161,22 @@ public void insertRecord(long recordPointer, long keyPrefix) {
pos++;
}

public static final class SortedIterator extends UnsafeSorterIterator {
public final class SortedIterator extends UnsafeSorterIterator {

private final TaskMemoryManager memoryManager;
private final int sortBufferInsertPosition;
private final LongArray sortBuffer;
private int position = 0;
private final int numRecords;
private int position;
private Object baseObject;
private long baseOffset;
private long keyPrefix;
private int recordLength;

private SortedIterator(
TaskMemoryManager memoryManager,
int sortBufferInsertPosition,
LongArray sortBuffer) {
this.memoryManager = memoryManager;
this.sortBufferInsertPosition = sortBufferInsertPosition;
this.sortBuffer = sortBuffer;
private SortedIterator(int numRecords) {
this.numRecords = numRecords;
this.position = 0;
}

public SortedIterator clone () {
SortedIterator iter = new SortedIterator(memoryManager, sortBufferInsertPosition, sortBuffer);
SortedIterator iter = new SortedIterator(numRecords);
iter.position = position;
iter.baseObject = baseObject;
iter.baseOffset = baseOffset;
Expand All @@ -192,21 +187,21 @@ public SortedIterator clone () {

@Override
public boolean hasNext() {
return position < sortBufferInsertPosition;
return position / 2 < numRecords;
}

public int numRecordsLeft() {
return (sortBufferInsertPosition - position) / 2;
return numRecords - position / 2;
}

@Override
public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = sortBuffer.get(position);
final long recordPointer = array.get(position);
baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4);
keyPrefix = sortBuffer.get(position + 1);
keyPrefix = array.get(position + 1);
position += 2;
}

Expand All @@ -229,6 +224,6 @@ public void loadNext() {
*/
public SortedIterator getSortedIterator() {
sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(memoryManager, pos, array);
return new SortedIterator(pos / 2);
}
}