Skip to content

Commit

Permalink
switch to sort based groupBy, based on size of data
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 19, 2014
1 parent 1ea0669 commit 3ee58e5
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,38 +339,37 @@ def _external_items(self):

try:
for i in range(self.partitions):
self.data = {}
for j in range(self.spills):
path = self._get_spill_dir(j)
p = os.path.join(path, str(i))
# do not check memory during merging
self.mergeCombiners(self.serializer.load_stream(open(p)), 0)

# limit the total partitions
if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
and j < self.spills - 1
and get_used_memory() > hard_limit):
self.data.clear() # will read from disk again
gc.collect() # release the memory as much as possible
for v in self._recursive_merged_items(i):
yield v
break
else:
for v in self.data.iteritems():
yield v
self.data.clear()

for v in self._merged_items(i):
yield v
self.data.clear()
gc.collect()
hard_limit = self._next_limit()

# remove the merged partition
for j in range(self.spills):
path = self._get_spill_dir(j)
os.remove(os.path.join(path, str(i)))

finally:
self._cleanup()

def _merged_items(self, index, limit=0):
self.data = {}
for j in range(self.spills):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
# do not check memory during merging
self.mergeCombiners(self.serializer.load_stream(open(p)), 0)

# limit the total partitions
if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
and j < self.spills - 1
and get_used_memory() > limit):
self.data.clear() # will read from disk again
gc.collect() # release the memory as much as possible
return self._recursive_merged_items(index)

return self.data.iteritems()

def _cleanup(self):
""" Clean up all the files in disks """
for d in self.localdirs:
Expand Down Expand Up @@ -603,7 +602,23 @@ def _spill(self):
self.spills += 1
gc.collect() # release the memory as much as possible

def _recursive_merged_items(self, index):
def _merge_items(self, index, limit=0):
size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index)))
for j in range(self.spills))
# if the memory can not hold all the partition,
# then use sort based merge
if (size >> 20) > self.memory_limit / 2:
return self._sorted_items(index)

self.data = {}
for j in range(self.spills):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
# do not check memory during merging
self.mergeCombiners(self.serializer.load_stream(open(p)), 0)
return self.data.iteritems()

def _sorted_items(self, index):
""" load a partition from disk, then sort and group by key """
def load_partition(j):
path = self._get_spill_dir(j)
Expand Down

0 comments on commit 3ee58e5

Please sign in to comment.