Skip to content

Commit

Permalink
free the array during spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Nov 18, 2015
1 parent 98be816 commit b69c1ee
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,13 @@ public long spill() throws IOException {
}
allocatedPages.clear();
}

// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
logger.warn("released {} from {}", released, this);
return released;
}
}
Expand All @@ -489,10 +496,6 @@ public void loadNext() throws IOException {
}
upstream = nextUpstream;
nextUpstream = null;

assert(inMemSorter != null);
inMemSorter.free();
inMemSorter = null;
}
numRecords--;
upstream.loadNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public UnsafeInMemorySorter(
*/
public void free() {
consumer.freeArray(array);
array = null;
}

public void reset() {
Expand Down Expand Up @@ -160,28 +161,22 @@ public void insertRecord(long recordPointer, long keyPrefix) {
pos++;
}

public static final class SortedIterator extends UnsafeSorterIterator {
public final class SortedIterator extends UnsafeSorterIterator {

private final TaskMemoryManager memoryManager;
private final int sortBufferInsertPosition;
private final LongArray sortBuffer;
private int position = 0;
private final int numRecords;
private int position;
private Object baseObject;
private long baseOffset;
private long keyPrefix;
private int recordLength;

private SortedIterator(
TaskMemoryManager memoryManager,
int sortBufferInsertPosition,
LongArray sortBuffer) {
this.memoryManager = memoryManager;
this.sortBufferInsertPosition = sortBufferInsertPosition;
this.sortBuffer = sortBuffer;
private SortedIterator(int numRecords) {
this.numRecords = numRecords;
this.position = 0;
}

public SortedIterator clone () {
SortedIterator iter = new SortedIterator(memoryManager, sortBufferInsertPosition, sortBuffer);
SortedIterator iter = new SortedIterator(numRecords);
iter.position = position;
iter.baseObject = baseObject;
iter.baseOffset = baseOffset;
Expand All @@ -192,21 +187,21 @@ public SortedIterator clone () {

@Override
public boolean hasNext() {
return position < sortBufferInsertPosition;
return position / 2 < numRecords;
}

public int numRecordsLeft() {
return (sortBufferInsertPosition - position) / 2;
return numRecords - position / 2;
}

@Override
public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = sortBuffer.get(position);
final long recordPointer = array.get(position);
baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4);
keyPrefix = sortBuffer.get(position + 1);
keyPrefix = array.get(position + 1);
position += 2;
}

Expand All @@ -229,6 +224,6 @@ public void loadNext() {
*/
public SortedIterator getSortedIterator() {
sorter.sort(array, 0, pos / 2, sortComparator);
return new SortedIterator(memoryManager, pos, array);
return new SortedIterator(pos / 2);
}
}

0 comments on commit b69c1ee

Please sign in to comment.