From e80647e6031890cf944d323650e253bb87fcce29 Mon Sep 17 00:00:00 2001 From: giwa Date: Mon, 18 Aug 2014 17:37:28 -0700 Subject: [PATCH] adopted the latest compression way of python command --- python/pyspark/streaming/dstream.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index a36f4b9bf9d87..0e2641e33032f 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -20,7 +20,8 @@ import operator from pyspark.serializers import NoOpSerializer,\ - BatchedSerializer, CloudPickleSerializer, pack_long + BatchedSerializer, CloudPickleSerializer, pack_long,\ + CompressedSerializer from pyspark.rdd import _JavaStackTrace from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable @@ -463,7 +464,8 @@ def _jdstream(self): serializer = self.ctx.serializer command = (self.func, self._prev_jrdd_deserializer, serializer) - pickled_command = CloudPickleSerializer().dumps(command) + ser = CompressedSerializer(CloudPickleSerializer()) + pickled_command = ser.dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], self.ctx._gateway._gateway_client) @@ -472,12 +474,13 @@ def _jdstream(self): env = MapConverter().convert(self.ctx.environment, self.ctx._gateway._gateway_client) includes = ListConverter().convert(self.ctx._python_includes, - self.ctx._gateway._gateway_client) + self.ctx._gateway._gateway_client) python_dstream = self.ctx._jvm.PythonDStream(self._prev_jdstream.dstream(), - bytearray(pickled_command), - env, includes, self.preservesPartitioning, - self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, - class_tag) + bytearray(pickled_command), + env, includes, self.preservesPartitioning, + self.ctx.pythonExec, + broadcast_vars, self.ctx._javaAccumulator, + class_tag) self._jdstream_val = python_dstream.asJavaDStream() return self._jdstream_val