Skip to content

Commit

Permalink
fix merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 27, 2014
1 parent 2c1d05b commit 779ed03
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,23 @@ class ExternalSorter(object):
The spilling will only happen when the used memory goes above
the limit.
>>> sorter = ExternalSorter(1) # 1M
>>> import random
>>> l = range(1024)
>>> random.shuffle(l)
>>> sorted(l) == list(sorter.sorted(l))
True
>>> sorted(l) == list(sorter.sorted(l, key=lambda x: -x, reverse=True))
True
"""
def __init__(self, memory_limit, serializer=None):
self.memory_limit = memory_limit
self.local_dirs = _get_local_dirs("sort")
self.serializer = serializer or BatchedSerializer(PickleSerializer(), 1024)
self.serializer = serializer or BatchedSerializer(
CompressedSerializer(PickleSerializer()), 1024)
self._spilled_bytes = 0

def _get_path(self, n):
""" Choose one directory for spill by number n """
Expand Down Expand Up @@ -472,6 +484,7 @@ def sorted(self, iterator, key=None, reverse=False):
path = self._get_path(len(chunks))
with open(path, 'w') as f:
self.serializer.dump_stream(current_chunk, f)
self._spilled_bytes += os.path.getsize(path)
chunks.append(self.serializer.load_stream(open(path)))
os.unlink(path) # data will be deleted after close
current_chunk = []
Expand All @@ -486,6 +499,7 @@ def sorted(self, iterator, key=None, reverse=False):

if current_chunk:
chunks.append(iter(current_chunk))

return heapq.merge(chunks, key=key, reverse=reverse)


Expand Down

0 comments on commit 779ed03

Please sign in to comment.