diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 34078142f538596949b8c67cf4b0570dd6e5e179..be081c3825566296308a3edfca8b509dacce99c7 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -43,11 +43,20 @@ private[spark] object TestUtils { * Note: if this is used during class loader tests, class names should be unique * in order to avoid interference between tests. */ - def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = { + def createJarWithClasses( + classNames: Seq[String], + toStringValue: String = "", + classNamesWithBase: Seq[(String, String)] = Seq(), + classpathUrls: Seq[URL] = Seq()): URL = { val tempDir = Utils.createTempDir() - val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value) + val files1 = for (name <- classNames) yield { + createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls) + } + val files2 = for ((childName, baseName) <- classNamesWithBase) yield { + createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls) + } val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis())) - createJar(files, jarFile) + createJar(files1 ++ files2, jarFile) } @@ -85,15 +94,26 @@ private[spark] object TestUtils { } /** Creates a compiled class with the given name. Class file will be placed in destDir. */ - def createCompiledClass(className: String, destDir: File, value: String = ""): File = { + def createCompiledClass( + className: String, + destDir: File, + toStringValue: String = "", + baseClass: String = null, + classpathUrls: Seq[URL] = Seq()): File = { val compiler = ToolProvider.getSystemJavaCompiler + val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val sourceFile = new JavaSourceFromString(className, - "public class " + className + " implements java.io.Serializable {" + - " @Override public String toString() { return \"" + value + "\"; }}") + "public class " + className + extendsText + " implements java.io.Serializable {" + + " @Override public String toString() { return \"" + toStringValue + "\"; }}") // Calling this outputs a class file in pwd. It's easier to just rename the file than // build a custom FileManager that controls the output location. - compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call() + val options = if (classpathUrls.nonEmpty) { + Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator)) + } else { + Seq() + } + compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call() val fileName = className + ".class" val result = new File(fileName) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 8bbfcd2997dc60fc0465d913e7767a5c5e1a63b4..9d25e647f1703bf00c10e90c6495dcbdde9ccffb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,11 +20,9 @@ package org.apache.spark.deploy import java.io.{File, PrintStream} import java.lang.reflect.{Modifier, InvocationTargetException} import java.net.URL - import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import org.apache.hadoop.fs.Path - import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} @@ -35,9 +33,10 @@ import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} - import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.util.Utils +import org.apache.spark.executor.ChildExecutorURLClassLoader +import org.apache.spark.executor.MutableURLClassLoader /** * Main gateway of launching a Spark application. @@ -389,8 +388,14 @@ object SparkSubmit { printStream.println("\n") } - val loader = new ExecutorURLClassLoader(new Array[URL](0), - Thread.currentThread.getContextClassLoader) + val loader = + if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) { + new ChildExecutorURLClassLoader(new Array[URL](0), + Thread.currentThread.getContextClassLoader) + } else { + new ExecutorURLClassLoader(new Array[URL](0), + Thread.currentThread.getContextClassLoader) + } Thread.currentThread.setContextClassLoader(loader) for (jar <- childClasspath) { @@ -434,7 +439,7 @@ object SparkSubmit { } } - private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { + private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) { val uri = Utils.resolveURI(localJar) uri.getScheme match { case "file" | "local" => diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala index 218ed7b5d2d39875084fb6b63773d65e92563106..8011e75944aac9a9d5fb1827d980f3ea5616576e 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala @@ -39,7 +39,17 @@ private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: Class super.addURL(url) } override def findClass(name: String): Class[_] = { - super.findClass(name) + val loaded = super.findLoadedClass(name) + if (loaded != null) { + return loaded + } + try { + super.findClass(name) + } catch { + case e: ClassNotFoundException => { + parentClassLoader.loadClass(name) + } + } } } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala index e2050e95a1b88911e13deb1c3ce8861e02244e0f..b7912c09d14105f386378344745200f2de7c9657 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala @@ -26,10 +26,14 @@ import org.apache.spark.util.Utils class ExecutorURLClassLoaderSuite extends FunSuite { - val childClassNames = List("FakeClass1", "FakeClass2") - val parentClassNames = List("FakeClass1", "FakeClass2", "FakeClass3") - val urls = List(TestUtils.createJarWithClasses(childClassNames, "1")).toArray - val urls2 = List(TestUtils.createJarWithClasses(parentClassNames, "2")).toArray + val urls2 = List(TestUtils.createJarWithClasses( + classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"), + toStringValue = "2")).toArray + val urls = List(TestUtils.createJarWithClasses( + classNames = Seq("FakeClass1"), + classNamesWithBase = Seq(("FakeClass2", "FakeClass3")), // FakeClass3 is in parent + toStringValue = "1", + classpathUrls = urls2)).toArray test("child first") { val parentLoader = new URLClassLoader(urls2, null) @@ -37,6 +41,8 @@ class ExecutorURLClassLoaderSuite extends FunSuite { val fakeClass = classLoader.loadClass("FakeClass2").newInstance() val fakeClassVersion = fakeClass.toString assert(fakeClassVersion === "1") + val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance() + assert(fakeClass.getClass === fakeClass2.getClass) } test("parent first") { @@ -45,6 +51,8 @@ class ExecutorURLClassLoaderSuite extends FunSuite { val fakeClass = classLoader.loadClass("FakeClass1").newInstance() val fakeClassVersion = fakeClass.toString assert(fakeClassVersion === "2") + val fakeClass2 = classLoader.loadClass("FakeClass1").newInstance() + assert(fakeClass.getClass === fakeClass2.getClass) } test("child first can fall back") {