From 6bb725e397471ca719db028318776775cb6a17bf Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Fri, 28 Feb 2014 16:57:27 +0800 Subject: [PATCH 01/10] fix #SPARK-1149 Bad partitioners can cause Spark to hang --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index da778aa851cd2..44822124e6928 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -847,6 +847,8 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { + val rddPartitions = rdd.partitions.map(_.index) + partitions.foreach(p =>require(rddPartitions.contains(p), "partition index out of range: " +p)) val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) From 3a659031929c5357b34e5b707161bebdb0b8f340 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Fri, 28 Feb 2014 23:14:39 +0800 Subject: [PATCH 02/10] make the code more readable --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 44822124e6928..50ffaed8000ab 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -848,7 +848,7 @@ class SparkContext( allowLocal: Boolean, resultHandler: (Int, U) => Unit) { val rddPartitions = rdd.partitions.map(_.index) - partitions.foreach(p =>require(rddPartitions.contains(p), "partition index out of range: " +p)) + require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) From e68210a668775f1f344c7f4634bf440bbcde1d34 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Sun, 2 Mar 2014 17:55:52 +0800 Subject: [PATCH 03/10] add partition index check to submitJob --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 50ffaed8000ab..a76fe8a2cedec 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -952,6 +952,8 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { + val rddPartitions = rdd.partitions.map(_.index) + require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( From 3348619bb29cb6456d28af79795acaff75e4e8a9 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Mon, 3 Mar 2014 10:25:11 +0800 Subject: [PATCH 04/10] Optimize performance for partitions check --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a76fe8a2cedec..c5856badce604 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -847,8 +847,8 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val rddPartitions = rdd.partitions.map(_.index) - require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") + val partitionRange = (0 until rdd.partitions.size) + require(partitions.forall(partitionRange.contains(_)), "partition index out of range") val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -952,8 +952,8 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - val rddPartitions = rdd.partitions.map(_.index) - require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") + val partitionRange = (0 until rdd.partitions.size) + require(partitions.forall(partitionRange.contains(_)), "partition index out of range") val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( From 928e1e3783fb22fba2595c9c3f9d4b3d20af60f7 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Tue, 4 Mar 2014 09:55:18 +0800 Subject: [PATCH 05/10] Added a unit test for PairRDDFunctions.lookup with bad partitioner --- .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 85e8eb5dc3a1e..f9e994b13dfbc 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(shuffled.lookup(5) === Seq(6,7)) assert(shuffled.lookup(-1) === Seq()) } + + test("lookup with bad partitioner") { + val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + + val p = new Partitioner { + def numPartitions: Int = 2 + + def getPartition(key: Any): Int = key.hashCode() % 2 + } + val shuffled = pairs.partitionBy(p) + + assert(shuffled.partitioner === Some(p)) + assert(shuffled.lookup(1) === Seq(2)) + intercept[IllegalArgumentException] {shuffled.lookup(-1)} + } + } /* From adc443ec23bcbaf87eb546e28e11df147d166a6f Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Tue, 4 Mar 2014 12:14:14 +0800 Subject: [PATCH 06/10] partitions check bugfix --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c5856badce604..ee1da00897f88 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -847,8 +847,7 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val partitionRange = (0 until rdd.partitions.size) - require(partitions.forall(partitionRange.contains(_)), "partition index out of range") + require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range") val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -952,8 +951,7 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - val partitionRange = (0 until rdd.partitions.size) - require(partitions.forall(partitionRange.contains(_)), "partition index out of range") + require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range") val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( From ac006a347dec32f7a92bd2117f2647c9e223c0ea Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Tue, 4 Mar 2014 17:15:21 +0800 Subject: [PATCH 07/10] code Formatting --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ee1da00897f88..e6b1c3ca81d78 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -847,7 +847,8 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range") + require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, + "partition index out of range") val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -951,7 +952,8 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range") + require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, + "partition index out of range") val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( From d0a600520d3e5473f86a16ea0067ccad64d61d3f Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Wed, 5 Mar 2014 10:04:52 +0800 Subject: [PATCH 08/10] review comment --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e6b1c3ca81d78..0576cdc80ae6c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -847,8 +847,8 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, - "partition index out of range") + val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) + require(outIndex.isEmpty,"Partition index out of bounds: "+ outIndex.mkString(",")) val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -952,8 +952,8 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, - "partition index out of range") + val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) + require(outIndex.isEmpty,"Partition index out of bounds: "+ outIndex.mkString(",")) val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( From b0d5c07eefb1844db55f7cb7a7227e6942005438 Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Wed, 5 Mar 2014 10:27:10 +0800 Subject: [PATCH 09/10] review comment --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0576cdc80ae6c..5de899a2d4ce9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -848,7 +848,7 @@ class SparkContext( allowLocal: Boolean, resultHandler: (Int, U) => Unit) { val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) - require(outIndex.isEmpty,"Partition index out of bounds: "+ outIndex.mkString(",")) + require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(",")) val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -953,7 +953,7 @@ class SparkContext( resultFunc: => R): SimpleFutureAction[R] = { val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) - require(outIndex.isEmpty,"Partition index out of bounds: "+ outIndex.mkString(",")) + require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(",")) val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( From 3dad595e0509535d28a04ea21fd49488aec5b7af Mon Sep 17 00:00:00 2001 From: liguoqiang Date: Fri, 7 Mar 2014 10:29:12 +0800 Subject: [PATCH 10/10] review comment --- .../main/scala/org/apache/spark/SparkContext.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b1b6491ab5678..eef469631141d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -851,8 +851,10 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) - require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(",")) + // TODO: All RDDs have continuous index space. How to ensure this? + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -956,8 +958,9 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { - val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet) - require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(",")) + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob(