From 31e92b72e31910be1694c348ab5de8b14f2df44b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 21:14:56 -0700 Subject: [PATCH 1/4] Adding Java versions and associated tests --- .../apache/spark/api/java/JavaDoubleRDD.scala | 11 +++++++ .../apache/spark/api/java/JavaPairRDD.scala | 11 +++++++ .../org/apache/spark/api/java/JavaRDD.scala | 11 +++++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../scala/org/apache/spark/JavaAPISuite.java | 21 ++++++++++++ .../streaming/api/java/JavaDStream.scala | 6 ++++ .../streaming/api/java/JavaPairDStream.scala | 6 ++++ .../apache/spark/streaming/JavaAPISuite.java | 33 +++++++++++++++++++ .../spark/streaming/JavaTestUtils.scala | 23 +++++++++++++ 9 files changed, 123 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index f9b6ee351a151..043cb183bad17 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -93,6 +93,17 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions, shuffle)) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions)) + /** * Return an RDD with the elements from `this` that are not in `other`. * diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 268f43b4e865b..39f408b8c8c7e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -107,6 +107,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions, shuffle)) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions)) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 662990049b093..3b359a8fd6094 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -81,6 +81,17 @@ JavaRDDLike[T, JavaRDD[T]] { def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = rdd.coalesce(numPartitions, shuffle) + /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 17bc2515f2f43..6e88be6f6ac64 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -268,7 +268,7 @@ abstract class RDD[T: ClassManifest]( /** * Return a new RDD that has exactly numPartitions partitions. * - * Used to increase or decrease the level of parallelism in this RDD. This will use + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 7b0bb89ab28ce..f38c607d653e5 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -472,6 +472,27 @@ public Iterable call(Iterator iter) { Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); } + @Test + public void repartition() { + // Shrinking number of partitions + JavaRDD in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2); + JavaRDD repartitioned1 = in1.repartition(4); + List> result1 = repartitioned1.glom().collect(); + Assert.assertEquals(4, result1.size()); + for (List l: result1) { + Assert.assertTrue(l.size() > 0); + } + + // Growing number of partitions + JavaRDD in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4); + JavaRDD repartitioned2 = in2.repartition(2); + List> result2 = repartitioned2.glom().collect(); + Assert.assertEquals(2, result2.size()); + for (List l: result2) { + Assert.assertTrue(l.size() > 0); + } + } + @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d1932b6b05a09..1a2aeaa8797e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -94,6 +94,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions) } object JavaDStream { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 978fca33ad03e..faf8f361826f7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -59,6 +59,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions) + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index dc01f1e8aa0ca..5a9836a415d1f 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -183,6 +183,39 @@ public Boolean call(String s) throws Exception { assertOrderInvariantEquals(expected, result); } + @Test + public void testRepartitionMorePartitions() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4,5,6,7,8,9,10), + Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStream repartitioned = stream.repartition(4); + JavaTestUtils.attachTestOutputStream(repartitioned); + List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for ( List> rdd : result) { + Assert.assertEquals(4, rdd.size()); + Assert.assertEquals( + 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); + } + } + + @Test + public void testRepartitionFewerPartitions() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3,4,5,6,7,8,9,10), + Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStream repartitioned = stream.repartition(2); + JavaTestUtils.attachTestOutputStream(repartitioned); + List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); + Assert.assertEquals(2, result.size()); + for ( List> rdd : result) { + Assert.assertEquals(2, rdd.size()); + Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); + } + } + @Test public void testGlom() { List> inputData = Arrays.asList( diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 5344ae7682fd6..780f7b823b101 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -62,6 +62,8 @@ trait JavaTestBase extends TestSuiteBase { * Process all registered streams for a numBatches batches, failing if * numExpectedOutput RDD's are not generated. Generated RDD's are collected * and returned, represented as a list for each batch interval. + * + * Returns a list of items for each RDD. */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { @@ -72,6 +74,27 @@ trait JavaTestBase extends TestSuiteBase { res.map(entry => out.append(new ArrayList[V](entry))) out } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[JList[V]]]() + res.map(entry => { + val lists = entry.map(new ArrayList[V](_)) + out.append(new ArrayList[JList[V]](lists)) + }) + out + } } object JavaTestUtils extends JavaTestBase { From a351fd4aeda7d137e4cece705e25b51d7634ca63 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 21:16:30 -0700 Subject: [PATCH 2/4] Small spacing fix --- .../test/java/org/apache/spark/streaming/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 5a9836a415d1f..391d7ba21d5f6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -193,7 +193,7 @@ public void testRepartitionMorePartitions() { JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); - for ( List> rdd : result) { + for (List> rdd : result) { Assert.assertEquals(4, rdd.size()); Assert.assertEquals( 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size()); @@ -210,7 +210,7 @@ public void testRepartitionFewerPartitions() { JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); - for ( List> rdd : result) { + for (List> rdd : result) { Assert.assertEquals(2, rdd.size()); Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size()); } From e5f6d5697b43ac89a50fb791f4b284409e75b1f4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 22:08:06 -0700 Subject: [PATCH 3/4] Spacing fix --- core/src/test/scala/org/apache/spark/JavaAPISuite.java | 4 ++-- .../java/org/apache/spark/streaming/JavaAPISuite.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index f38c607d653e5..352036f182e24 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -475,7 +475,7 @@ public Iterable call(Iterator iter) { @Test public void repartition() { // Shrinking number of partitions - JavaRDD in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2); + JavaRDD in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2); JavaRDD repartitioned1 = in1.repartition(4); List> result1 = repartitioned1.glom().collect(); Assert.assertEquals(4, result1.size()); @@ -484,7 +484,7 @@ public void repartition() { } // Growing number of partitions - JavaRDD in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4); + JavaRDD in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4); JavaRDD repartitioned2 = in2.repartition(2); List> result2 = repartitioned2.glom().collect(); Assert.assertEquals(2, result2.size()); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 391d7ba21d5f6..9da8adda837a2 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -186,8 +186,8 @@ public Boolean call(String s) throws Exception { @Test public void testRepartitionMorePartitions() { List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4,5,6,7,8,9,10), - Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); JavaDStream repartitioned = stream.repartition(4); JavaTestUtils.attachTestOutputStream(repartitioned); @@ -203,8 +203,8 @@ public void testRepartitionMorePartitions() { @Test public void testRepartitionFewerPartitions() { List> inputData = Arrays.asList( - Arrays.asList(1,2,3,4,5,6,7,8,9,10), - Arrays.asList(1,2,3,4,5,6,7,8,9,10)); + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); JavaDStream repartitioned = stream.repartition(2); JavaTestUtils.attachTestOutputStream(repartitioned); From ad5f579cbfb3f0c9834cc5fdd26ad0745f5f8abf Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 24 Oct 2013 22:18:53 -0700 Subject: [PATCH 4/4] Style fixes --- .../apache/spark/streaming/JavaTestUtils.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 780f7b823b101..5e384eeee45f3 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -33,9 +33,9 @@ trait JavaTestBase extends TestSuiteBase { * The stream will be derived from the supplied lists of Java objects. **/ def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) implicit val cm: ClassManifest[T] = @@ -50,7 +50,7 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] @@ -66,7 +66,7 @@ trait JavaTestBase extends TestSuiteBase { * Returns a list of items for each RDD. */ def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassManifest[V] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) @@ -83,16 +83,16 @@ trait JavaTestBase extends TestSuiteBase { * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. */ - def runStreamsWithPartitions[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { + def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, + numExpectedOutput: Int): JList[JList[JList[V]]] = { implicit val cm: ClassManifest[V] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[JList[V]]]() - res.map(entry => { + res.map{entry => val lists = entry.map(new ArrayList[V](_)) out.append(new ArrayList[JList[V]](lists)) - }) + } out } }