diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index c319a57fdd10af7e441db9ec642e68d52b414d85..f1361546a322fe214d46cc6ed714181a15b61f39 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -20,9 +20,10 @@ package spark import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import spark.rdd.ShuffledRDD import spark.SparkContext._ import spark.ShuffleSuite.NonJavaSerializableClass +import spark.rdd.OrderedRDDFunctions +import spark.rdd.ShuffledRDD import spark.util.MutablePair @@ -137,12 +138,27 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { sc = new SparkContext("local-cluster[2,1,512]", "test") def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1)) - val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data) + val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2) val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2)) .collect() data.foreach { pair => results should contain (pair) } } + + test("sorting using mutable pairs") { + // This is not in SortingSuite because of the local cluster setup. + // Use a local cluster with 2 processes to make sure there are both local and remote blocks + sc = new SparkContext("local-cluster[2,1,512]", "test") + def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) + val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22)) + val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2) + val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs) + .sortByKey().collect() + results(0) should be (p(1, 11)) + results(1) should be (p(2, 22)) + results(2) should be (p(3, 33)) + results(3) should be (p(100, 100)) + } } object ShuffleSuite {