From e068f21e0108b89dc9ecf84d98f2ba619d6435e2 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Tue, 28 Sep 2010 22:32:38 -0700
Subject: [PATCH] More work on HTTP class loading

---
 Makefile                                      |  2 +
 run                                           |  2 +
 src/scala/spark/Executor.scala                |  8 ++--
 .../spark/repl/ExecutorClassLoader.scala      | 40 +++++++++++++++----
 src/scala/spark/repl/SparkInterpreter.scala   | 33 +++++++++------
 5 files changed, 61 insertions(+), 24 deletions(-)

diff --git a/Makefile b/Makefile
index 9c9ebb6a82..1a3e97a7be 100644
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
 JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
 JARS += third_party/scalatest-1.2/scalatest-1.2.jar
 JARS += third_party/scalacheck_2.8.0-1.7.jar
+JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
 CLASSPATH = $(subst $(SPACE),:,$(JARS))
 
 SCALA_SOURCES =  src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
diff --git a/run b/run
index 7e044eb021..33eec9fe1b 100755
--- a/run
+++ b/run
@@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
 SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
 SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
 SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
+SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
 for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
   SPARK_CLASSPATH+=:$jar
 done
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index 68f4cb8aae..58b20b41dc 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -25,10 +25,10 @@ object Executor {
         // If the REPL is in use, create a ClassLoader that will be able to
         // read new classes defined by the REPL as the user types code
         classLoader = this.getClass.getClassLoader
-        val classDir = System.getProperty("spark.repl.current.classdir")
-        if (classDir != null) {
-          println("Using REPL classdir: " + classDir)
-          classLoader = new repl.ExecutorClassLoader(classDir, classLoader)
+        val classUri = System.getProperty("spark.repl.class.uri")
+        if (classUri != null) {
+          println("Using REPL class URI: " + classUri)
+          classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
         }
         Thread.currentThread.setContextClassLoader(classLoader)
         
diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala
index 7d91b20e79..0833b319f7 100644
--- a/src/scala/spark/repl/ExecutorClassLoader.scala
+++ b/src/scala/spark/repl/ExecutorClassLoader.scala
@@ -1,7 +1,7 @@
 package spark.repl
 
 import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLClassLoader}
+import java.net.{URI, URL, URLClassLoader, URLEncoder}
 import java.util.concurrent.{Executors, ExecutorService}
 
 import org.apache.hadoop.conf.Configuration
@@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor
 import org.objectweb.asm.Opcodes._
 
 
-// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load
-// classes defined by the interpreter when the REPL is in use
-class ExecutorClassLoader(classDir: String, parent: ClassLoader)
+/**
+ * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
+ * used to load classes defined by the interpreter when the REPL is used
+ */
+class ExecutorClassLoader(classUri: String, parent: ClassLoader)
 extends ClassLoader(parent) {
-  val fileSystem = FileSystem.get(new URI(classDir), new Configuration())
-  val directory = new URI(classDir).getPath
+  val uri = new URI(classUri)
+  val directory = uri.getPath
+
+  // Hadoop FileSystem object for our URI, if it isn't using HTTP
+  var fileSystem: FileSystem = {
+    if (uri.getScheme() == "http")
+      null
+    else
+      FileSystem.get(uri, new Configuration())
+  }
   
   override def findClass(name: String): Class[_] = {
     try {
       //println("repl.ExecutorClassLoader resolving " + name)
-      val path = new Path(directory, name.replace('.', '/') + ".class")
-      val bytes = readAndTransformClass(name, fileSystem.open(path))
+      val pathInDirectory = name.replace('.', '/') + ".class"
+      val inputStream = {
+        if (fileSystem != null)
+          fileSystem.open(new Path(directory, pathInDirectory))
+        else
+          new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+      }
+      val bytes = readAndTransformClass(name, inputStream)
+      inputStream.close()
       return defineClass(name, bytes, 0, bytes.length)
     } catch {
       case e: Exception => throw new ClassNotFoundException(name, e)
@@ -57,6 +74,13 @@ extends ClassLoader(parent) {
       return bos.toByteArray
     }
   }
+
+  /**
+   * URL-encode a string, preserving only slashes
+   */
+  def urlEncode(str: String): String = {
+    str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
+  }
 }
 
 class ConstructorCleaner(className: String, cv: ClassVisitor)
diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala
index 2bfaaf7342..29a4420950 100644
--- a/src/scala/spark/repl/SparkInterpreter.scala
+++ b/src/scala/spark/repl/SparkInterpreter.scala
@@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
   
   val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
 
-  /** directory to save .class files to */
-  //val virtualDirectory = new VirtualDirectory("(memory)", None)
-  val virtualDirectory = {
+  val outputDir = {
     val rootDir = new File(System.getProperty("spark.repl.classdir",
                            System.getProperty("java.io.tmpdir")))
     var attempts = 0
     val maxAttempts = 10
-    var outputDir: File = null
-    while (outputDir == null) {
+    var dir: File = null
+    while (dir == null) {
       attempts += 1
       if (attempts > maxAttempts) {
         throw new IOException("Failed to create a temp directory " +
                               "after " + maxAttempts + " attempts!")
       }
       try {
-        outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
-        if (outputDir.exists() || !outputDir.mkdirs())
-          outputDir = null
+        dir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
+        if (dir.exists() || !dir.mkdirs())
+          dir = null
       } catch { case e: IOException => ; }
     }
-    System.setProperty("spark.repl.current.classdir",
-      "file://" + outputDir.getAbsolutePath + "/")
     if (SPARK_DEBUG_REPL)
-      println("Output directory: " + outputDir)
-    new PlainFile(outputDir)
+      println("Output directory: " + dir)
+    dir
   }
+
+  /** directory to save .class files to */
+  //val virtualDirectory = new VirtualDirectory("(memory)", None)
+  val virtualDirectory = new PlainFile(outputDir)
+
+  /** Jetty server that will serve our classes to worker nodes */
+  val classServer = new ClassServer(outputDir)
+
+  // Start the classServer and remember its URI in a spark system property */
+  classServer.start()
+  println("ClassServer started, URI = " + classServer.uri)
+  System.setProperty("spark.repl.class.uri", classServer.uri)
   
   /** reporter */
   object reporter extends ConsoleReporter(settings, null, out) {
@@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
    */
   def close() {
     reporter.flush
+    classServer.stop()
   }
 
   /** A traverser that finds all mentioned identifiers, i.e. things
-- 
GitLab