diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index cbb32986b1b03..dc7247e8d035c 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -203,16 +203,16 @@ class ExternalMerger(Merger): >>> agg = SimpleAggregator(lambda x, y: x + y) >>> merger = ExternalMerger(agg, 10) >>> N = 10000 - >>> merger.mergeValues(zip(xrange(N), xrange(N)) * 10) + >>> merger.mergeValues(zip(xrange(N), xrange(N))) >>> assert merger.spills > 0 >>> sum(v for k,v in merger.iteritems()) - 499950000 + 49995000 >>> merger = ExternalMerger(agg, 10) - >>> merger.mergeCombiners(zip(xrange(N), xrange(N)) * 10) + >>> merger.mergeCombiners(zip(xrange(N), xrange(N))) >>> assert merger.spills > 0 >>> sum(v for k,v in merger.iteritems()) - 499950000 + 49995000 """ # the max total partitions created recursively @@ -376,7 +376,6 @@ def _external_items(self): if any(self.pdata): self._spill() self.pdata = [] - hard_limit = self._next_limit() try: for i in range(self.partitions): @@ -384,7 +383,6 @@ def _external_items(self): yield v self.data.clear() gc.collect() - hard_limit = self._next_limit() # remove the merged partition for j in range(self.spills): @@ -393,8 +391,9 @@ def _external_items(self): finally: self._cleanup() - def _merged_items(self, index, limit=0): + def _merged_items(self, index): self.data = {} + limit = self._next_limit() for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(index)) @@ -411,11 +410,6 @@ def _merged_items(self, index, limit=0): return self.data.iteritems() - def _cleanup(self): - """ Clean up all the files in disks """ - for d in self.localdirs: - shutil.rmtree(d, True) - def _recursive_merged_items(self, index): """ merge the partitioned items and return the as iterator @@ -440,6 +434,11 @@ def _recursive_merged_items(self, index): return m._external_items() + def _cleanup(self): + """ Clean up all the files in disks """ + for d in self.localdirs: + shutil.rmtree(d, True) + class ExternalSorter(object): """ @@ -572,7 +571,7 @@ def __iter__(self): for v in self.values: yield v - if not self.groupBy or not self.groupBy.next_item: + if not self.groupBy or self.groupBy.next_item: # different key was already found by previous accessing return @@ -777,14 +776,14 @@ def _spill(self): gc.collect() # release the memory as much as possible MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 - def _merged_items(self, index, limit=0): + def _merged_items(self, index): size = sum(os.path.getsize(os.path.join(self._get_spill_dir(j), str(index))) for j in range(self.spills)) # if the memory can not hold all the partition, # then use sort based merge. Because of compression, # the data on disks will be much smaller than needed memory if (size >> 20) >= self.memory_limit / 10: - return self._sorted_items(index) + return self._merge_sorted_items(index) self.data = {} for j in range(self.spills): @@ -794,7 +793,7 @@ def _merged_items(self, index, limit=0): self.mergeCombiners(self.serializer.load_stream(open(p)), 0) return self.data.iteritems() - def _sorted_items(self, index): + def _merge_sorted_items(self, index): """ load a partition from disk, then sort and group by key """ def load_partition(j): path = self._get_spill_dir(j) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1b2f9a5bb82f9..128d1bcb61cdb 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -70,7 +70,7 @@ class TestMerger(unittest.TestCase): def setUp(self): - self.N = 1 << 16 + self.N = 1 << 12 self.l = [i for i in xrange(self.N)] self.data = zip(self.l, self.l) self.agg = Aggregator(lambda x: [x],