diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a897e532184ac5bf1172e3c482aeb852291ff5bc..6196f7b165049652d2dddf75886e83aaaf872e1d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -103,7 +103,7 @@ private[spark] class Executor( private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) // Set the classloader for serializer - env.serializer.setDefaultClassLoader(urlClassLoader) + env.serializer.setDefaultClassLoader(replClassLoader) // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 249f438459300df4fdac94e0d2afb5f0330b4540..934daaeaafca1c81628a3bd4abb3ac3ffff5ef65 100644 --- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -121,9 +121,9 @@ class ReplSuite extends FunSuite { val output = runInterpreter("local", """ |var v = 7 - |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -137,7 +137,7 @@ class ReplSuite extends FunSuite { |class C { |def foo = 5 |} - |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -148,7 +148,7 @@ class ReplSuite extends FunSuite { val output = runInterpreter("local", """ |def double(x: Int) = x + x - |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -160,9 +160,9 @@ class ReplSuite extends FunSuite { """ |var v = 7 |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -178,9 +178,9 @@ class ReplSuite extends FunSuite { """ |var array = new Array[Int](5) |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -216,14 +216,14 @@ class ReplSuite extends FunSuite { """ |var v = 7 |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |var array = new Array[Int](5) |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -262,7 +262,7 @@ class ReplSuite extends FunSuite { |val sqlContext = new org.apache.spark.sql.SQLContext(sc) |import sqlContext.implicits._ |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect() + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -275,7 +275,7 @@ class ReplSuite extends FunSuite { |val t = new TestClass |import t.testMethod |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -287,14 +287,14 @@ class ReplSuite extends FunSuite { """ |var v = 7 |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |var array = new Array[Int](5) |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -309,10 +309,22 @@ class ReplSuite extends FunSuite { val output = runInterpreter("local[2]", """ |case class Foo(i: Int) - |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect + |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) assertContains("ret: Array[Foo] = Array(Foo(1),", output) } + + test("collecting objects of class defined in repl - shuffling") { + val output = runInterpreter("local-cluster[1,1,512]", + """ + |case class Foo(i: Int) + |val list = List((1, Foo(1)), (1, Foo(2))) + |val ret = sc.parallelize(list).groupByKey().collect() + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output) + } } diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index b3bd1355481243e50a594777fd32b7fddd7c3aac..fbef5b25ba68841e7e43be4cec3d243fc4322dde 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -128,9 +128,9 @@ class ReplSuite extends FunSuite { val output = runInterpreter("local", """ |var v = 7 - |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -144,7 +144,7 @@ class ReplSuite extends FunSuite { |class C { |def foo = 5 |} - |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -155,7 +155,7 @@ class ReplSuite extends FunSuite { val output = runInterpreter("local", """ |def double(x: Int) = x + x - |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -167,9 +167,9 @@ class ReplSuite extends FunSuite { """ |var v = 7 |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -185,9 +185,9 @@ class ReplSuite extends FunSuite { """ |var array = new Array[Int](5) |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -224,14 +224,14 @@ class ReplSuite extends FunSuite { """ |var v = 7 |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |var array = new Array[Int](5) |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -270,7 +270,7 @@ class ReplSuite extends FunSuite { |val sqlContext = new org.apache.spark.sql.SQLContext(sc) |import sqlContext.implicits._ |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -283,7 +283,7 @@ class ReplSuite extends FunSuite { |val t = new TestClass |import t.testMethod |case class TestCaseClass(value: Int) - |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect + |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -295,14 +295,14 @@ class ReplSuite extends FunSuite { """ |var v = 7 |def getV() = v - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |v = 10 - |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_) |var array = new Array[Int](5) |val broadcastArray = sc.broadcast(array) - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() |array(0) = 5 - |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) @@ -317,10 +317,22 @@ class ReplSuite extends FunSuite { val output = runInterpreter("local[2]", """ |case class Foo(i: Int) - |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect + |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect() """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) assertContains("ret: Array[Foo] = Array(Foo(1),", output) } + + test("collecting objects of class defined in repl - shuffling") { + val output = runInterpreter("local-cluster[1,1,512]", + """ + |case class Foo(i: Int) + |val list = List((1, Foo(1)), (1, Foo(2))) + |val ret = sc.parallelize(list).groupByKey().collect() + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output) + } }