diff --git a/Makefile b/Makefile index 9c9ebb6a82cf52f5071f04cc523437ab5c29ac92..1a3e97a7bea9b9f1353f5bbdf77206637e115f6d 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar JARS += third_party/scalatest-1.2/scalatest-1.2.jar JARS += third_party/scalacheck_2.8.0-1.7.jar +JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar +JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar CLASSPATH = $(subst $(SPACE),:,$(JARS)) SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala diff --git a/run b/run index 7e044eb021a4b34ea41e2637604aabc665ca6a40..33eec9fe1b21dcebe2510c9351c64e3b91cd5f41 100755 --- a/run +++ b/run @@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar +SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar +SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do SPARK_CLASSPATH+=:$jar done diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala index 68f4cb8aae61cc2b53b93dae0331152b2eee157c..58b20b41dc1fde1ec4200b534a562387edba8540 100644 --- a/src/scala/spark/Executor.scala +++ b/src/scala/spark/Executor.scala @@ -25,10 +25,10 @@ object Executor { // If the REPL is in use, create a ClassLoader that will be able to // read new classes defined by the REPL as the user types code classLoader = this.getClass.getClassLoader - val classDir = System.getProperty("spark.repl.current.classdir") - if (classDir != null) { - println("Using REPL classdir: " + classDir) - classLoader = new repl.ExecutorClassLoader(classDir, classLoader) + val classUri = System.getProperty("spark.repl.class.uri") + if (classUri != null) { + println("Using REPL class URI: " + classUri) + classLoader = new repl.ExecutorClassLoader(classUri, classLoader) } Thread.currentThread.setContextClassLoader(classLoader) diff --git a/src/scala/spark/repl/ClassServer.scala b/src/scala/spark/repl/ClassServer.scala new file mode 100644 index 0000000000000000000000000000000000000000..14ab2fe2a30ef2d827ea08a2d0e0599fdbfde026 --- /dev/null +++ b/src/scala/spark/repl/ClassServer.scala @@ -0,0 +1,74 @@ +package spark.repl + +import java.io.File +import java.net.InetAddress + +import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.handler.DefaultHandler +import org.eclipse.jetty.server.handler.HandlerList +import org.eclipse.jetty.server.handler.ResourceHandler + + +/** + * Exception type thrown by ClassServer when it is in the wrong state + * for an operation. + */ +class ServerStateException(message: String) extends Exception(message) + + +/** + * An HTTP server used by the interpreter to allow worker nodes to access + * class files created as the user types in lines of code. This is just a + * wrapper around a Jetty embedded HTTP server. + */ +class ClassServer(classDir: File) { + private var server: Server = null + private var port: Int = -1 + + def start() { + if (server != null) { + throw new ServerStateException("Server is already started") + } else { + server = new Server(0) + val resHandler = new ResourceHandler + resHandler.setResourceBase(classDir.getAbsolutePath) + val handlerList = new HandlerList + handlerList.setHandlers(Array(resHandler, new DefaultHandler)) + server.setHandler(handlerList) + server.start() + port = server.getConnectors()(0).getLocalPort() + } + } + + def stop() { + if (server == null) { + throw new ServerStateException("Server is already stopped") + } else { + server.stop() + port = -1 + server = null + } + } + + /** + * Get the URI of this HTTP server (http://host:port) + */ + def uri: String = { + if (server == null) { + throw new ServerStateException("Server is not started") + } else { + return "http://" + getLocalIpAddress + ":" + port + } + } + + /** + * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4) + */ + private def getLocalIpAddress: String = { + // Get local IP as an array of four bytes + val bytes = InetAddress.getLocalHost().getAddress() + // Convert the bytes to ints (keeping in mind that they may be negative) + // and join them into a string + return bytes.map(b => (b.toInt + 256) % 256).mkString(".") + } +} diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala index 7d91b20e792e1c0290cfc9433268768ddf77410a..0833b319f7f3d564713d91938e094ccf01e6a9d0 100644 --- a/src/scala/spark/repl/ExecutorClassLoader.scala +++ b/src/scala/spark/repl/ExecutorClassLoader.scala @@ -1,7 +1,7 @@ package spark.repl import java.io.{ByteArrayOutputStream, InputStream} -import java.net.{URI, URL, URLClassLoader} +import java.net.{URI, URL, URLClassLoader, URLEncoder} import java.util.concurrent.{Executors, ExecutorService} import org.apache.hadoop.conf.Configuration @@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor import org.objectweb.asm.Opcodes._ -// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load -// classes defined by the interpreter when the REPL is in use -class ExecutorClassLoader(classDir: String, parent: ClassLoader) +/** + * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI, + * used to load classes defined by the interpreter when the REPL is used + */ +class ExecutorClassLoader(classUri: String, parent: ClassLoader) extends ClassLoader(parent) { - val fileSystem = FileSystem.get(new URI(classDir), new Configuration()) - val directory = new URI(classDir).getPath + val uri = new URI(classUri) + val directory = uri.getPath + + // Hadoop FileSystem object for our URI, if it isn't using HTTP + var fileSystem: FileSystem = { + if (uri.getScheme() == "http") + null + else + FileSystem.get(uri, new Configuration()) + } override def findClass(name: String): Class[_] = { try { //println("repl.ExecutorClassLoader resolving " + name) - val path = new Path(directory, name.replace('.', '/') + ".class") - val bytes = readAndTransformClass(name, fileSystem.open(path)) + val pathInDirectory = name.replace('.', '/') + ".class" + val inputStream = { + if (fileSystem != null) + fileSystem.open(new Path(directory, pathInDirectory)) + else + new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + } + val bytes = readAndTransformClass(name, inputStream) + inputStream.close() return defineClass(name, bytes, 0, bytes.length) } catch { case e: Exception => throw new ClassNotFoundException(name, e) @@ -57,6 +74,13 @@ extends ClassLoader(parent) { return bos.toByteArray } } + + /** + * URL-encode a string, preserving only slashes + */ + def urlEncode(str: String): String = { + str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/") + } } class ConstructorCleaner(className: String, cv: ClassVisitor) diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala index 2bfaaf73421c8fcbcd0f19d00adcc2142be36687..29a442095020dec8133c6a050493df2c1622b4dd 100644 --- a/src/scala/spark/repl/SparkInterpreter.scala +++ b/src/scala/spark/repl/SparkInterpreter.scala @@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") - /** directory to save .class files to */ - //val virtualDirectory = new VirtualDirectory("(memory)", None) - val virtualDirectory = { + val outputDir = { val rootDir = new File(System.getProperty("spark.repl.classdir", System.getProperty("java.io.tmpdir"))) var attempts = 0 val maxAttempts = 10 - var outputDir: File = null - while (outputDir == null) { + var dir: File = null + while (dir == null) { attempts += 1 if (attempts > maxAttempts) { throw new IOException("Failed to create a temp directory " + "after " + maxAttempts + " attempts!") } try { - outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString) - if (outputDir.exists() || !outputDir.mkdirs()) - outputDir = null + dir = new File(rootDir, "spark-" + UUID.randomUUID.toString) + if (dir.exists() || !dir.mkdirs()) + dir = null } catch { case e: IOException => ; } } - System.setProperty("spark.repl.current.classdir", - "file://" + outputDir.getAbsolutePath + "/") if (SPARK_DEBUG_REPL) - println("Output directory: " + outputDir) - new PlainFile(outputDir) + println("Output directory: " + dir) + dir } + + /** directory to save .class files to */ + //val virtualDirectory = new VirtualDirectory("(memory)", None) + val virtualDirectory = new PlainFile(outputDir) + + /** Jetty server that will serve our classes to worker nodes */ + val classServer = new ClassServer(outputDir) + + // Start the classServer and remember its URI in a spark system property */ + classServer.start() + println("ClassServer started, URI = " + classServer.uri) + System.setProperty("spark.repl.class.uri", classServer.uri) /** reporter */ object reporter extends ConsoleReporter(settings, null, out) { @@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { */ def close() { reporter.flush + classServer.stop() } /** A traverser that finds all mentioned identifiers, i.e. things diff --git a/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar b/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar new file mode 100644 index 0000000000000000000000000000000000000000..d9ef50be6d1c2f767a5cb9491a983e1100667365 Binary files /dev/null and b/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar differ diff --git a/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar b/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar new file mode 100644 index 0000000000000000000000000000000000000000..fb5249346847105c26d30da1048d8e2c364e7d6f Binary files /dev/null and b/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar differ