Skip to content

Commit

Permalink
add comments, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 18, 2014
1 parent 47918b8 commit 341f1e0
Showing 1 changed file with 47 additions and 10 deletions.
57 changes: 47 additions & 10 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ def __setstate__(self, item):
def __iter__(self):
if self._file is not None:
self._file.flush()
with os.fdopen(os.dup(self._file.fileno()), 'r', 65536) as f:
# read all items from disks first
with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
f.seek(0)
for values in self._ser.load_stream(f):
for v in values:
Expand All @@ -571,14 +572,19 @@ def __iter__(self):
for v in self.values:
yield v

if self.groupBy and self.groupBy.next_item is None:
for key, value in self.iterator:
if key == self.key:
self.append(value) # save it for next read
yield value
else:
self.groupBy.next_item = (key, value)
break
if not self.groupBy or not self.groupBy.next_item:
# different key was already found by previous accessing
return

# consume items from iterator
for key, value in self.iterator:
if key == self.key:
self.append(value) # save it for next access
yield value
else:
# save it
self.groupBy.next_item = (key, value)
break

def __len__(self):
return sum(1 for _ in self)
Expand Down Expand Up @@ -616,7 +622,7 @@ def _spill(self):

class ChainedIterable(object):
"""
Pickable chained iterator
Pickable chained iterator, similar to itertools.chain.fromiterable()
"""
def __init__(self, iterators):
self.iterators = iterators
Expand Down Expand Up @@ -667,6 +673,37 @@ class ExternalGroupBy(ExternalMerger):
"""
Group by the items by key. If any partition of them can not been
hold in memory, it will do sort based group by.
This class works as follows:
- It repeatedly group the items by key and save them in one dict in
memory.
- When the used memory goes above memory limit, it will split
the combined data into partitions by hash code, dump them
into disk, one file per partition. If the number of keys
in one partitions is smaller than 1000, it will sort them
by key before dumping into disk.
- Then it goes through the rest of the iterator, group items
by key into different dict by hash. Until the used memory goes over
memory limit, it dump all the dicts into disks, one file per
dict. Repeat this again until combine all the items. It
also will try to sort the items by key in each partition
before dumping into disks.
- It will yield the grouped items partitions by partitions.
If the data in one partitions can be hold in memory, then it
will load and combine them in memory and yield.
- If the dataset in one partittion cannot be hold in memory,
it will sort them first. If all the files are already sorted,
it merge them by heap.merge(), so it will do external sort
for all the files.
- After sorting, `GroupByKey` class will put all the continious
items with the same key as a group, yield the values as
an iterator.
"""
SORT_KEY_LIMIT = 1000

Expand Down

0 comments on commit 341f1e0

Please sign in to comment.