Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 25, 2014
1 parent 0d3395f commit 1f69f93
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 17 deletions.
31 changes: 15 additions & 16 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ class ExternalMerger(Merger):
>>> agg = SimpleAggregator(lambda x, y: x + y)
>>> merger = ExternalMerger(agg, 10)
>>> N = 10000
>>> merger.mergeValues(zip(xrange(N), xrange(N)) * 10)
>>> merger.mergeValues(zip(xrange(N), xrange(N)))
>>> assert merger.spills > 0
>>> sum(v for k,v in merger.iteritems())
499950000
49995000
>>> merger = ExternalMerger(agg, 10)
>>> merger.mergeCombiners(zip(xrange(N), xrange(N)) * 10)
>>> merger.mergeCombiners(zip(xrange(N), xrange(N)))
>>> assert merger.spills > 0
>>> sum(v for k,v in merger.iteritems())
499950000
49995000
"""

# the max total partitions created recursively
Expand Down Expand Up @@ -376,15 +376,13 @@ def _external_items(self):
if any(self.pdata):
self._spill()
self.pdata = []
hard_limit = self._next_limit()

try:
for i in range(self.partitions):
for v in self._merged_items(i):
yield v
self.data.clear()
gc.collect()
hard_limit = self._next_limit()

# remove the merged partition
for j in range(self.spills):
Expand All @@ -393,8 +391,9 @@ def _external_items(self):
finally:
self._cleanup()

def _merged_items(self, index, limit=0):
def _merged_items(self, index):
self.data = {}
limit = self._next_limit()
for j in range(self.spills):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
Expand All @@ -411,11 +410,6 @@ def _merged_items(self, index, limit=0):

return self.data.iteritems()

def _cleanup(self):
""" Clean up all the files in disks """
for d in self.localdirs:
shutil.rmtree(d, True)

def _recursive_merged_items(self, index):
"""
merge the partitioned items and return the as iterator
Expand All @@ -440,6 +434,11 @@ def _recursive_merged_items(self, index):

return m._external_items()

def _cleanup(self):
""" Clean up all the files in disks """
for d in self.localdirs:
shutil.rmtree(d, True)


class ExternalSorter(object):
"""
Expand Down Expand Up @@ -572,7 +571,7 @@ def __iter__(self):
for v in self.values:
yield v

if not self.groupBy or not self.groupBy.next_item:
if not self.groupBy or self.groupBy.next_item:
# different key was already found by previous accessing
return

Expand Down Expand Up @@ -777,14 +776,14 @@ def _spill(self):
gc.collect() # release the memory as much as possible
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20

def _merged_items(self, index, limit=0):
def _merged_items(self, index):
size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index)))
for j in range(self.spills))
# if the memory can not hold all the partition,
# then use sort based merge. Because of compression,
# the data on disks will be much smaller than needed memory
if (size >> 20) >= self.memory_limit / 10:
return self._sorted_items(index)
return self._merge_sorted_items(index)

self.data = {}
for j in range(self.spills):
Expand All @@ -794,7 +793,7 @@ def _merged_items(self, index, limit=0):
self.mergeCombiners(self.serializer.load_stream(open(p)), 0)
return self.data.iteritems()

def _sorted_items(self, index):
def _merge_sorted_items(self, index):
""" load a partition from disk, then sort and group by key """
def load_partition(j):
path = self._get_spill_dir(j)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
class TestMerger(unittest.TestCase):

def setUp(self):
self.N = 1 << 16
self.N = 1 << 12
self.l = [i for i in xrange(self.N)]
self.data = zip(self.l, self.l)
self.agg = Aggregator(lambda x: [x],
Expand Down

0 comments on commit 1f69f93

Please sign in to comment.