Skip to content
Snippets Groups Projects
Commit 17f49992 authored by maxwell's avatar maxwell Committed by Tathagata Das
Browse files

[SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes.

When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart,
ClassNotFound exception would occur.

The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class.

For example:
Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception.

details of the difference between Class.forName and loader.loadClass can be found here.
http://bugs.java.com/view_bug.do?bug_id=6446627

Author: maxwell <maxwellzdm@gmail.com>
Author: DEMING ZHU <deming.zhu@linecorp.com>

Closes #8955 from maxwellzdm/master.
parent 8f888eea
No related branches found
No related tags found
No related merge requests found
......@@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
return loader.loadClass(desc.getName())
// scalastyle:off classforname
return Class.forName(desc.getName(), false, loader)
// scalastyle:on classforname
} catch {
case e: Exception =>
}
......
......@@ -17,7 +17,8 @@
package org.apache.spark.streaming
import java.io.File
import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
import org.apache.spark.TestUtils
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
......@@ -34,7 +35,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
import org.apache.spark.util.{Clock, ManualClock, Utils}
import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
/**
* This test suites tests the checkpointing functionality of DStreams -
......@@ -579,6 +580,36 @@ class CheckpointSuite extends TestSuiteBase {
}
}
// This tests whether spark can deserialize array object
// refer to SPARK-5569
test("recovery from checkpoint contains array object") {
// create a class which is invisible to app class loader
val jar = TestUtils.createJarWithClasses(
classNames = Seq("testClz"),
toStringValue = "testStringValue"
)
// invisible to current class loader
val appClassLoader = getClass.getClassLoader
intercept[ClassNotFoundException](appClassLoader.loadClass("testClz"))
// visible to mutableURLClassLoader
val loader = new MutableURLClassLoader(
Array(jar), appClassLoader)
assert(loader.loadClass("testClz").newInstance().toString == "testStringValue")
// create and serialize Array[testClz]
// scalastyle:off classforname
val arrayObj = Class.forName("[LtestClz;", false, loader)
// scalastyle:on classforname
val bos = new ByteArrayOutputStream()
new ObjectOutputStream(bos).writeObject(arrayObj)
// deserialize the Array[testClz]
val ois = new ObjectInputStreamWithLoader(
new ByteArrayInputStream(bos.toByteArray), loader)
assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
}
/**
* Tests a streaming operation under checkpointing, by restarting the operation
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment