Skip to content

Commit

Permalink
compress the data while spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Aug 20, 2014
1 parent 0a081c6 commit 4b07d39
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import operator

import pyspark.heapq3 as heapq
from pyspark.serializers import BatchedSerializer, PickleSerializer, FlattedValuesSerializer
from pyspark.serializers import BatchedSerializer, PickleSerializer, FlattedValuesSerializer, \
CompressedSerializer

try:
import psutil
Expand Down Expand Up @@ -204,8 +205,16 @@ def __init__(self, aggregator, memory_limit=512, serializer=None,
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
self.serializer = serializer or \
BatchedSerializer(PickleSerializer(), 1024)
self.serializer = serializer or PickleSerializer()
# add compression
if isinstance(self.serializer, BatchedSerializer):
if not isinstance(self.serializer.serializer, CompressedSerializer):
self.serializer = BatchedSerializer(
CompressedSerializer(self.serializer.serializer),
self.serializer.batchSize)
else:
if not isinstance(self.serializer, CompressedSerializer):
self.serializer = CompressedSerializer(self.serializer)
self.localdirs = localdirs or _get_local_dirs(str(id(self)))
# number of partitions when spill data into disks
self.partitions = partitions
Expand Down

0 comments on commit 4b07d39

Please sign in to comment.