From 473ec4b8a31da5db2834922a1887d40ded58f9d2 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Wed, 7 May 2014 19:04:03 -0700 Subject: [PATCH 1/2] [SPARK-1690] Tolerating empty elements when saving Python RDD to text files --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fecd9762f3f60..378f70412b317 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag]( val obj = new Array[Byte](length) stream.readFully(obj) obj + case 0 => Array.empty[Byte] case SpecialLengths.TIMING_DATA => // Timing data from worker val bootTime = stream.readLong() @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag]( stream.readFully(update) accumulator += Collections.singletonList(update) } - Array.empty[Byte] + null } } catch { @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag]( var _nextObj = read() - def hasNext = _nextObj.length != 0 + def hasNext = _nextObj != null } new InterruptibleIterator(context, stdoutIterator) } From c62ad33edf93902bf8a1fed9d33756dedaf4b298 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Wed, 7 May 2014 19:14:15 -0700 Subject: [PATCH 2/2] Adding Python doctest --- python/pyspark/rdd.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3a1c56af5b221..4f74824ba4cf2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -891,6 +891,14 @@ def saveAsTextFile(self, path): >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' + + Empty lines are tolerated when saving to text files. + + >>> tempFile2 = NamedTemporaryFile(delete=True) + >>> tempFile2.close() + >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) + >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) + '\\n\\n\\nbar\\nfoo\\n' """ def func(split, iterator): for x in iterator: