Skip to content
Snippets Groups Projects
Commit e068f21e authored by Matei Zaharia's avatar Matei Zaharia
Browse files

More work on HTTP class loading

parent 7ef3a20a
No related branches found
No related tags found
No related merge requests found
...@@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar ...@@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
JARS += third_party/scalatest-1.2/scalatest-1.2.jar JARS += third_party/scalatest-1.2/scalatest-1.2.jar
JARS += third_party/scalacheck_2.8.0-1.7.jar JARS += third_party/scalacheck_2.8.0-1.7.jar
JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS)) CLASSPATH = $(subst $(SPACE),:,$(JARS))
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
......
...@@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar ...@@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
SPARK_CLASSPATH+=:$jar SPARK_CLASSPATH+=:$jar
done done
......
...@@ -25,10 +25,10 @@ object Executor { ...@@ -25,10 +25,10 @@ object Executor {
// If the REPL is in use, create a ClassLoader that will be able to // If the REPL is in use, create a ClassLoader that will be able to
// read new classes defined by the REPL as the user types code // read new classes defined by the REPL as the user types code
classLoader = this.getClass.getClassLoader classLoader = this.getClass.getClassLoader
val classDir = System.getProperty("spark.repl.current.classdir") val classUri = System.getProperty("spark.repl.class.uri")
if (classDir != null) { if (classUri != null) {
println("Using REPL classdir: " + classDir) println("Using REPL class URI: " + classUri)
classLoader = new repl.ExecutorClassLoader(classDir, classLoader) classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
} }
Thread.currentThread.setContextClassLoader(classLoader) Thread.currentThread.setContextClassLoader(classLoader)
......
package spark.repl package spark.repl
import java.io.{ByteArrayOutputStream, InputStream} import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{URI, URL, URLClassLoader} import java.net.{URI, URL, URLClassLoader, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService} import java.util.concurrent.{Executors, ExecutorService}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
...@@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor ...@@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._ import org.objectweb.asm.Opcodes._
// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load /**
// classes defined by the interpreter when the REPL is in use * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
class ExecutorClassLoader(classDir: String, parent: ClassLoader) * used to load classes defined by the interpreter when the REPL is used
*/
class ExecutorClassLoader(classUri: String, parent: ClassLoader)
extends ClassLoader(parent) { extends ClassLoader(parent) {
val fileSystem = FileSystem.get(new URI(classDir), new Configuration()) val uri = new URI(classUri)
val directory = new URI(classDir).getPath val directory = uri.getPath
// Hadoop FileSystem object for our URI, if it isn't using HTTP
var fileSystem: FileSystem = {
if (uri.getScheme() == "http")
null
else
FileSystem.get(uri, new Configuration())
}
override def findClass(name: String): Class[_] = { override def findClass(name: String): Class[_] = {
try { try {
//println("repl.ExecutorClassLoader resolving " + name) //println("repl.ExecutorClassLoader resolving " + name)
val path = new Path(directory, name.replace('.', '/') + ".class") val pathInDirectory = name.replace('.', '/') + ".class"
val bytes = readAndTransformClass(name, fileSystem.open(path)) val inputStream = {
if (fileSystem != null)
fileSystem.open(new Path(directory, pathInDirectory))
else
new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
}
val bytes = readAndTransformClass(name, inputStream)
inputStream.close()
return defineClass(name, bytes, 0, bytes.length) return defineClass(name, bytes, 0, bytes.length)
} catch { } catch {
case e: Exception => throw new ClassNotFoundException(name, e) case e: Exception => throw new ClassNotFoundException(name, e)
...@@ -57,6 +74,13 @@ extends ClassLoader(parent) { ...@@ -57,6 +74,13 @@ extends ClassLoader(parent) {
return bos.toByteArray return bos.toByteArray
} }
} }
/**
* URL-encode a string, preserving only slashes
*/
def urlEncode(str: String): String = {
str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
}
} }
class ConstructorCleaner(className: String, cv: ClassVisitor) class ConstructorCleaner(className: String, cv: ClassVisitor)
......
...@@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { ...@@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
/** directory to save .class files to */ val outputDir = {
//val virtualDirectory = new VirtualDirectory("(memory)", None)
val virtualDirectory = {
val rootDir = new File(System.getProperty("spark.repl.classdir", val rootDir = new File(System.getProperty("spark.repl.classdir",
System.getProperty("java.io.tmpdir"))) System.getProperty("java.io.tmpdir")))
var attempts = 0 var attempts = 0
val maxAttempts = 10 val maxAttempts = 10
var outputDir: File = null var dir: File = null
while (outputDir == null) { while (dir == null) {
attempts += 1 attempts += 1
if (attempts > maxAttempts) { if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory " + throw new IOException("Failed to create a temp directory " +
"after " + maxAttempts + " attempts!") "after " + maxAttempts + " attempts!")
} }
try { try {
outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString) dir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
if (outputDir.exists() || !outputDir.mkdirs()) if (dir.exists() || !dir.mkdirs())
outputDir = null dir = null
} catch { case e: IOException => ; } } catch { case e: IOException => ; }
} }
System.setProperty("spark.repl.current.classdir",
"file://" + outputDir.getAbsolutePath + "/")
if (SPARK_DEBUG_REPL) if (SPARK_DEBUG_REPL)
println("Output directory: " + outputDir) println("Output directory: " + dir)
new PlainFile(outputDir) dir
} }
/** directory to save .class files to */
//val virtualDirectory = new VirtualDirectory("(memory)", None)
val virtualDirectory = new PlainFile(outputDir)
/** Jetty server that will serve our classes to worker nodes */
val classServer = new ClassServer(outputDir)
// Start the classServer and remember its URI in a spark system property */
classServer.start()
println("ClassServer started, URI = " + classServer.uri)
System.setProperty("spark.repl.class.uri", classServer.uri)
/** reporter */ /** reporter */
object reporter extends ConsoleReporter(settings, null, out) { object reporter extends ConsoleReporter(settings, null, out) {
...@@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { ...@@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
*/ */
def close() { def close() {
reporter.flush reporter.flush
classServer.stop()
} }
/** A traverser that finds all mentioned identifiers, i.e. things /** A traverser that finds all mentioned identifiers, i.e. things
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment