Skip to content
Snippets Groups Projects
Commit f0edeae7 authored by Kevin (Sangwoo) Kim's avatar Kevin (Sangwoo) Kim Committed by Reynold Xin
Browse files

[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running...

[SPARK-6299][CORE] ClassNotFoundException in standalone mode when running groupByKey with class defined in REPL

```
case class ClassA(value: String)
val rdd = sc.parallelize(List(("k1", ClassA("v1")), ("k1", ClassA("v2")) ))
rdd.groupByKey.collect
```
This code used to be throw exception in spark-shell, because while shuffling ```JavaSerializer```uses ```defaultClassLoader``` which was defined like ```env.serializer.setDefaultClassLoader(urlClassLoader)```.

It should be ```env.serializer.setDefaultClassLoader(replClassLoader)```, like
```
    override def run() {
      val deserializeStartTime = System.currentTimeMillis()
      Thread.currentThread.setContextClassLoader(replClassLoader)
```
in TaskRunner.

When ```replClassLoader``` cannot be defined, it's identical with ```urlClassLoader```

Author: Kevin (Sangwoo) Kim <sangwookim.me@gmail.com>

Closes #5046 from swkimme/master and squashes the following commits:

fa2b9ee [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
6e9620b [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
d23e4e2 [Kevin (Sangwoo) Kim] stylish test codes ( collect -> collect() )
a4a3c8a [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite
bd00da5 [Kevin (Sangwoo) Kim] add 'class defined in repl - shuffle' test to ReplSuite
c1b1fc7 [Kevin (Sangwoo) Kim] use REPL class loader for executor's serializer
parent 9667b9f9
No related branches found
No related tags found
No related merge requests found
......@@ -103,7 +103,7 @@ private[spark] class Executor(
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(urlClassLoader)
env.serializer.setDefaultClassLoader(replClassLoader)
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
......
......@@ -121,9 +121,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -137,7 +137,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
|sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -148,7 +148,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
|sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -160,9 +160,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -178,9 +178,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -216,14 +216,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -262,7 +262,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect()
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -275,7 +275,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -287,14 +287,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -309,10 +309,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
test("collecting objects of class defined in repl - shuffling") {
val output = runInterpreter("local-cluster[1,1,512]",
"""
|case class Foo(i: Int)
|val list = List((1, Foo(1)), (1, Foo(2)))
|val ret = sc.parallelize(list).groupByKey().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
}
}
......@@ -128,9 +128,9 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|var v = 7
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -144,7 +144,7 @@ class ReplSuite extends FunSuite {
|class C {
|def foo = 5
|}
|sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -155,7 +155,7 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local",
"""
|def double(x: Int) = x + x
|sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -167,9 +167,9 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -185,9 +185,9 @@ class ReplSuite extends FunSuite {
"""
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -224,14 +224,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -270,7 +270,7 @@ class ReplSuite extends FunSuite {
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -283,7 +283,7 @@ class ReplSuite extends FunSuite {
|val t = new TestClass
|import t.testMethod
|case class TestCaseClass(value: Int)
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -295,14 +295,14 @@ class ReplSuite extends FunSuite {
"""
|var v = 7
|def getV() = v
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|v = 10
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
|var array = new Array[Int](5)
|val broadcastArray = sc.broadcast(array)
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
|array(0) = 5
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
......@@ -317,10 +317,22 @@ class ReplSuite extends FunSuite {
val output = runInterpreter("local[2]",
"""
|case class Foo(i: Int)
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
|val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[Foo] = Array(Foo(1),", output)
}
test("collecting objects of class defined in repl - shuffling") {
val output = runInterpreter("local-cluster[1,1,512]",
"""
|case class Foo(i: Int)
|val list = List((1, Foo(1)), (1, Foo(2)))
|val ret = sc.parallelize(list).groupByKey().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("ret: Array[(Int, Iterable[Foo])] = Array((1,", output)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment