From d8f52dc7dc34bc6c1c368d790b7bdfe30c4eb529 Mon Sep 17 00:00:00 2001 From: maji2014 Date: Tue, 2 Dec 2014 01:54:33 -0800 Subject: [PATCH 1/3] code optimization for judgement --- .../scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala | 2 +- .../scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala | 2 +- .../scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 5baf45db45c17..ffbb2717053d2 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -45,7 +45,7 @@ private[spark] class HashShuffleReader[K, C]( } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { + } else if (dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { // Convert the Product2s to pairs since this is what downstream RDDs currently expect diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 183a30373b28c..8f05d3b048b17 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -56,7 +56,7 @@ private[spark] class HashShuffleWriter[K, V]( } else { records } - } else if (dep.aggregator.isEmpty && dep.mapSideCombine) { + } else if (dep.mapSideCombine) { throw new IllegalStateException("Aggregator is empty for map-side combine") } else { records diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index d75f9d7311fad..cf0a0d5e83be1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -50,7 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { - if (!dep.aggregator.isDefined) { + if (dep.aggregator.isEmpty) { throw new IllegalStateException("Aggregator is empty for map-side combine") } sorter = new ExternalSorter[K, V, C]( From 10d0cf0e5282bff251c1195e2d423952f36b92a7 Mon Sep 17 00:00:00 2001 From: maji2014 Date: Tue, 2 Dec 2014 21:38:46 -0800 Subject: [PATCH 2/3] change a elegant way --- .../org/apache/spark/shuffle/hash/HashShuffleReader.scala | 4 ++-- .../org/apache/spark/shuffle/hash/HashShuffleWriter.scala | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index ffbb2717053d2..de72148ccc7ac 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C]( } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } - } else if (dep.mapSideCombine) { - throw new IllegalStateException("Aggregator is empty for map-side combine") } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") + // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 8f05d3b048b17..755f17d6aa15a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V]( } else { records } - } else if (dep.mapSideCombine) { - throw new IllegalStateException("Aggregator is empty for map-side combine") } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") records } From bf7b14d915b1823662c1944bdc60b2a07e416422 Mon Sep 17 00:00:00 2001 From: maji2014 Date: Wed, 3 Dec 2014 17:27:34 -0800 Subject: [PATCH 3/3] change a elegant way for SortShuffleWriter.scala --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index cf0a0d5e83be1..27496c5a289cb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { - if (dep.aggregator.isEmpty) { - throw new IllegalStateException("Aggregator is empty for map-side combine") - } + require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records)