diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 745e3fa4e85f64427c881fb175f6279d10c64488..852ed8fe1fb9147b83f8389401c92f2aa39b18af 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -852,6 +852,9 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -955,6 +958,9 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 85e8eb5dc3a1e9679d4c7b445875eb8a3a37bf09..f9e994b13dfbc2dc46e54e45c7a419a99c1241f2 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(shuffled.lookup(5) === Seq(6,7)) assert(shuffled.lookup(-1) === Seq()) } + + test("lookup with bad partitioner") { + val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + + val p = new Partitioner { + def numPartitions: Int = 2 + + def getPartition(key: Any): Int = key.hashCode() % 2 + } + val shuffled = pairs.partitionBy(p) + + assert(shuffled.partitioner === Some(p)) + assert(shuffled.lookup(1) === Seq(2)) + intercept[IllegalArgumentException] {shuffled.lookup(-1)} + } + } /*