Skip to content

Commit

Permalink
simplify serializer, use AutoBatchedSerializer by default.
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Oct 24, 2014
1 parent 222fa47 commit 8d77ef2
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 173 deletions.
16 changes: 5 additions & 11 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
if batchSize == 1:
self.serializer = self._unbatched_serializer
elif batchSize == 0:
if batchSize == 0:
self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
else:
self.serializer = BatchedSerializer(self._unbatched_serializer,
Expand Down Expand Up @@ -306,12 +304,8 @@ def parallelize(self, c, numSlices=None):
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
batchSize = min(len(c) // numSlices, self._batchSize)
if batchSize > 1:
serializer = BatchedSerializer(self._unbatched_serializer,
batchSize)
else:
serializer = self._unbatched_serializer
batchSize = max(1, min(len(c) // numSlices, self._batchSize))
serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
serializer.dump_stream(c, tempFile)
tempFile.close()
readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
Expand Down Expand Up @@ -432,7 +426,7 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
ser = BatchedSerializer(PickleSerializer())
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits, batchSize)
return RDD(jrdd, self, ser)
Expand Down Expand Up @@ -837,7 +831,7 @@ def _test():
import doctest
import tempfile
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['sc'] = SparkContext('local[4]', 'PythonTest')
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.serializers import PickleSerializer
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd

__all__ = ['KMeansModel', 'KMeans']
Expand Down Expand Up @@ -83,7 +83,7 @@ def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"
sc = rdd.context
ser = PickleSerializer()
# cache serialized data to avoid objects over head in JVM
cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
cached = rdd.map(_convert_to_vector)._pickled().cache()
model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
_to_java_object_rdd(cached), k, maxIterations, runs, initializationMode)
bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""
Python package for feature in MLlib.
"""
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.serializers import PickleSerializer
from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd

__all__ = ['Word2Vec', 'Word2VecModel']
Expand Down
5 changes: 1 addition & 4 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import numpy as np

from pyspark.serializers import AutoBatchedSerializer, PickleSerializer

__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']


Expand Down Expand Up @@ -59,8 +57,7 @@ def _to_java_object_rdd(rdd):
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
return rdd.ctx._jvm.SerDe.pythonToJava(rdd._pickled()._jrdd, True)


def _convert_to_vector(l):
Expand Down
4 changes: 1 addition & 3 deletions python/pyspark/mllib/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from functools import wraps

from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer


__all__ = ['RandomRDDs', ]
Expand All @@ -32,8 +31,7 @@ def serialize(f):
@wraps(f)
def func(sc, *a, **kw):
jrdd = f(sc, *a, **kw)
return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc,
BatchedSerializer(PickleSerializer(), 1024))
return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc)
return func


Expand Down
14 changes: 5 additions & 9 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
from pyspark.mllib.linalg import _to_java_object_rdd

Expand Down Expand Up @@ -97,22 +96,19 @@ def predictAll(self, user_product):
sc = self._context
tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
jresult = self._java_model.predict(tuplerdd).toJavaRDD()
return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
AutoBatchedSerializer(PickleSerializer()))
return RDD(sc._jvm.SerDe.javaToPython(jresult), sc)

def userFeatures(self):
sc = self._context
juf = self._java_model.userFeatures()
juf = sc._jvm.SerDe.fromTuple2RDD(juf).toJavaRDD()
return RDD(sc._jvm.PythonRDD.javaToPython(juf), sc,
AutoBatchedSerializer(PickleSerializer()))
return RDD(sc._jvm.PythonRDD.javaToPython(juf), sc)

def productFeatures(self):
sc = self._context
jpf = self._java_model.productFeatures()
jpf = sc._jvm.SerDe.fromTuple2RDD(jpf).toJavaRDD()
return RDD(sc._jvm.PythonRDD.javaToPython(jpf), sc,
AutoBatchedSerializer(PickleSerializer()))
return RDD(sc._jvm.PythonRDD.javaToPython(jpf), sc)


class ALS(object):
Expand All @@ -128,7 +124,7 @@ def _prepare(cls, ratings):
raise ValueError("rating should be RDD of Rating or tuple/list")
# serialize them by AutoBatchedSerializer before cache to reduce the
# objects overhead in JVM
cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
cached = ratings._pickled().cache()
return _to_java_object_rdd(cached)

@classmethod
Expand All @@ -151,7 +147,7 @@ def _test():
import doctest
import pyspark.mllib.recommendation
globs = pyspark.mllib.recommendation.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from numpy import array

from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.serializers import PickleSerializer
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd

__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
Expand Down Expand Up @@ -130,7 +130,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights)
initial_bytes = bytearray(ser.dumps(_convert_to_vector(initial_weights)))
# use AutoBatchedSerializer before cache to reduce the memory
# overhead in JVM
cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
cached = data._pickled().cache()
ans = train_func(_to_java_object_rdd(cached), initial_bytes)
assert len(ans) == 2, "JVM call result had unexpected length"
weights = ser.loads(str(ans[0]))
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from py4j.java_collections import MapConverter

from pyspark import SparkContext, RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
from pyspark.serializers import PickleSerializer
from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint

Expand Down Expand Up @@ -63,7 +63,7 @@ def predict(self, x):
x = x.map(_convert_to_vector)
jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD()
jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred)
return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
return RDD(jpyrdd, self._sc)

else:
# Assume x is a single data point.
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import warnings

from pyspark.rdd import RDD
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint

Expand Down Expand Up @@ -175,7 +174,7 @@ def loadLabeledPoints(sc, path, minPartitions=None):
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
jpyrdd = sc._jvm.SerDe.javaToPython(jrdd)
return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer()))
return RDD(jpyrdd, sc)


def _test():
Expand Down
Loading

0 comments on commit 8d77ef2

Please sign in to comment.