From 5d1ec64e7934ad7f922cdab516fa5de690644780 Mon Sep 17 00:00:00 2001 From: liguoqiang <liguoqiang@rd.tuan800.com> Date: Wed, 12 Mar 2014 12:59:51 -0700 Subject: [PATCH] Fix #SPARK-1149 Bad partitioners can cause Spark to hang Author: liguoqiang <liguoqiang@rd.tuan800.com> Closes #44 from witgo/SPARK-1149 and squashes the following commits: 3dcdcaf [liguoqiang] Merge branch 'master' into SPARK-1149 8425395 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149 3dad595 [liguoqiang] review comment e3e56aa [liguoqiang] Merge branch 'master' into SPARK-1149 b0d5c07 [liguoqiang] review comment d0a6005 [liguoqiang] review comment 3395ee7 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149 ac006a3 [liguoqiang] code Formatting 3feb3a8 [liguoqiang] Merge branch 'master' into SPARK-1149 adc443e [liguoqiang] partitions check bugfix 928e1e3 [liguoqiang] Added a unit test for PairRDDFunctions.lookup with bad partitioner db6ecc5 [liguoqiang] Merge branch 'master' into SPARK-1149 1e3331e [liguoqiang] Merge branch 'master' into SPARK-1149 3348619 [liguoqiang] Optimize performance for partitions check 61e5a87 [liguoqiang] Merge branch 'master' into SPARK-1149 e68210a [liguoqiang] add partition index check to submitJob 3a65903 [liguoqiang] make the code more readable 6bb725e [liguoqiang] fix #SPARK-1149 Bad partitioners can cause Spark to hang --- .../scala/org/apache/spark/SparkContext.scala | 6 ++++++ .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 745e3fa4e8..852ed8fe1f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -852,6 +852,9 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -955,6 +958,9 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 85e8eb5dc3..f9e994b13d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(shuffled.lookup(5) === Seq(6,7)) assert(shuffled.lookup(-1) === Seq()) } + + test("lookup with bad partitioner") { + val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + + val p = new Partitioner { + def numPartitions: Int = 2 + + def getPartition(key: Any): Int = key.hashCode() % 2 + } + val shuffled = pairs.partitionBy(p) + + assert(shuffled.partitioner === Some(p)) + assert(shuffled.lookup(1) === Seq(2)) + intercept[IllegalArgumentException] {shuffled.lookup(-1)} + } + } /* -- GitLab