diff --git a/.gitignore b/.gitignore
index 8156d6e8c10ea7ccf6ba0fa7c4c8e76c0c3c4f6a..5abdec5d50d8b68b20a6fbd05bdc1a952ac55370 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,4 @@ third_party/libmesos.so
 third_party/libmesos.dylib
 conf/java-opts
 conf/spark-env.sh
+conf/log4j.properties
diff --git a/Makefile b/Makefile
index 9c9ebb6a82cf52f5071f04cc523437ab5c29ac92..c5d004fb108e785581def5ca35961bd07ad8e2c5 100644
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,11 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
 JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
 JARS += third_party/scalatest-1.2/scalatest-1.2.jar
 JARS += third_party/scalacheck_2.8.0-1.7.jar
+JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
+JARS += third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
+JARS += third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
+JARS += third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
 CLASSPATH = $(subst $(SPACE),:,$(JARS))
 
 SCALA_SOURCES =  src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
diff --git a/conf/log4j.properties b/conf/log4j.properties
new file mode 100644
index 0000000000000000000000000000000000000000..d72dbadc3904f327effddf99594045067be2f529
--- /dev/null
+++ b/conf/log4j.properties
@@ -0,0 +1,8 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
diff --git a/run b/run
index 7e044eb021a4b34ea41e2637604aabc665ca6a40..f28b39af9b084e84b3a77b51dda3777593694b2a 100755
--- a/run
+++ b/run
@@ -28,19 +28,24 @@ fi
 export JAVA_OPTS
 
 # Build up classpath
-SPARK_CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
-SPARK_CLASSPATH+=:$FWDIR/third_party/mesos.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/colt.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
-SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
-SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
-SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
+CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
+CLASSPATH+=:$FWDIR/conf
+CLASSPATH+=:$FWDIR/third_party/mesos.jar
+CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
+CLASSPATH+=:$FWDIR/third_party/colt.jar
+CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
+CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
+CLASSPATH+=:$FWDIR/third_party/scalatest-1.2/scalatest-1.2.jar
+CLASSPATH+=:$FWDIR/third_party/scalacheck_2.8.0-1.7.jar
+CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
+CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
+CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
+CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
 for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
-  SPARK_CLASSPATH+=:$jar
+  CLASSPATH+=:$jar
 done
-export SPARK_CLASSPATH
-export CLASSPATH=$SPARK_CLASSPATH # Needed for spark-shell
+export CLASSPATH # Needed for spark-shell
 
 if [ -n "$SCALA_HOME" ]; then
   SCALA=${SCALA_HOME}/bin/scala
@@ -48,4 +53,4 @@ else
   SCALA=scala
 fi
 
-exec $SCALA -cp $SPARK_CLASSPATH $@
+exec $SCALA -cp $CLASSPATH $@
diff --git a/spark-executor b/spark-executor
index ee847cfff03ebb9a78017f078a287ecc4341949c..0f9b9b1ece1a27bef97be6fe9c705a5ee9e6632f 100755
--- a/spark-executor
+++ b/spark-executor
@@ -1,5 +1,4 @@
 #!/bin/sh
-echo "In spark-executor"
 FWDIR="`dirname $0`"
-echo Framework dir: $FWDIR
+echo "Running spark-executor with framework dir = $FWDIR"
 exec $FWDIR/run spark.Executor
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 7fe84da47c94a9c82ff6ad5267d0a2b4cebe859d..fc376359838a2fca60c65d94b58355ee548b82fe 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -33,7 +33,7 @@ trait BroadcastRecipe {
 // TODO: Right, now no parallelization between multiple broadcasts
 @serializable
 class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) 
-  extends BroadcastRecipe {
+extends BroadcastRecipe with Logging {
   
   def value = value_
 
@@ -92,7 +92,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
         } 
         
         val time = (System.nanoTime - start) / 1e9
-        println( System.currentTimeMillis + ": " +  "Reading Broadcasted variable " + uuid + " took " + time + " s")                  
+        logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")                  
       }
     }
   }
@@ -149,7 +149,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
 
 @serializable 
 class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) 
-  extends BroadcastRecipe {
+extends BroadcastRecipe with Logging {
   
   def value = value_
 
@@ -179,7 +179,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
         fileIn.close
         
         val time = (System.nanoTime - start) / 1e9
-        println( System.currentTimeMillis + ": " +  "Reading Broadcasted variable " + uuid + " took " + time + " s")
+        logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
       }
     }
   }
@@ -188,7 +188,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
 @serializable
 case class SourceInfo (val hostAddress: String, val listenPort: Int, 
   val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)  
-  extends Comparable [SourceInfo]{
+extends Comparable[SourceInfo]{
 
   var currentLeechers = 0
   var receptionFailed = false
@@ -231,7 +231,7 @@ private object Broadcast {
   }
 }
 
-private object BroadcastCS {
+private object BroadcastCS extends Logging {
   val values = new MapMaker ().softValues ().makeMap[UUID, Any]
   // val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any]  
 
@@ -286,15 +286,15 @@ private object BroadcastCS {
           guideMR = new GuideMultipleRequests
           guideMR.setDaemon (true)
           guideMR.start
-          println (System.currentTimeMillis + ": " +  "GuideMultipleRequests started")
+          logInfo("GuideMultipleRequests started")
         }        
         serveMR = new ServeMultipleRequests
         serveMR.setDaemon (true)
         serveMR.start
         
-        println (System.currentTimeMillis + ": " +  "ServeMultipleRequests started")
+        logInfo("ServeMultipleRequests started")
         
-        println (System.currentTimeMillis + ": " +  "BroadcastCS object has been initialized")
+        logInfo("BroadcastCS object has been initialized")
                   
         initialized = true
       }
@@ -352,7 +352,7 @@ private object BroadcastCS {
       // Connect to Master and send this worker's Information
       val clientSocketToMaster = 
         new Socket(BroadcastCS.masterHostAddress, BroadcastCS.masterListenPort)  
-      println (System.currentTimeMillis + ": " +  "Connected to Master's guiding object")
+      logInfo("Connected to Master's guiding object")
       // TODO: Guiding object connection is reusable
       val oisMaster = 
         new ObjectInputStream (clientSocketToMaster.getInputStream)
@@ -371,11 +371,11 @@ private object BroadcastCS {
       }
       totalBytes = sourceInfo.totalBytes
       
-      println (System.currentTimeMillis + ": " +  "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)    
+      logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)    
 
       retByteArray = receiveSingleTransmission (sourceInfo)
       
-      println (System.currentTimeMillis + ": " +  "I got this from receiveSingleTransmission: " + retByteArray)
+      logInfo("I got this from receiveSingleTransmission: " + retByteArray)
 
       // TODO: Update sourceInfo to add error notifactions for Master
       if (retByteArray == null) { sourceInfo.receptionFailed = true }
@@ -414,8 +414,8 @@ private object BroadcastCS {
       oisSource = 
         new ObjectInputStream (clientSocketToSource.getInputStream)
         
-      println (System.currentTimeMillis + ": " +  "Inside receiveSingleTransmission")
-      println (System.currentTimeMillis + ": " +  "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
+      logInfo("Inside receiveSingleTransmission")
+      logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
       retByteArray = new Array[Byte] (totalBytes)
       for (i <- 0 until totalBlocks) {
         val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
@@ -426,14 +426,14 @@ private object BroadcastCS {
         hasBlocksLock.synchronized {
           hasBlocksLock.notifyAll
         }
-        println (System.currentTimeMillis + ": " +  "Received block: " + i + " " + bcBlock)
+        logInfo("Received block: " + i + " " + bcBlock)
       } 
       assert (hasBlocks == totalBlocks)
-      println (System.currentTimeMillis + ": " +  "After the receive loop")
+      logInfo("After the receive loop")
     } catch {
       case e: Exception => { 
         retByteArray = null 
-        println (System.currentTimeMillis + ": " +  "receiveSingleTransmission had a " + e)
+        logInfo("receiveSingleTransmission had a " + e)
       }
     } finally {    
       if (oisSource != null) { oisSource.close }
@@ -446,13 +446,13 @@ private object BroadcastCS {
     return retByteArray
   } 
   
-  class TrackMultipleValues extends Thread {
+  class TrackMultipleValues extends Thread with Logging {
     override def run = {
       var threadPool = Executors.newCachedThreadPool
       var serverSocket: ServerSocket = null
       
       serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
-      println (System.currentTimeMillis + ": " +  "TrackMultipleVariables" + serverSocket + " " + listenPort)
+      logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort)
       
       var keepAccepting = true
       try {
@@ -463,11 +463,11 @@ private object BroadcastCS {
             clientSocket = serverSocket.accept
           } catch {
             case e: Exception => { 
-              println ("TrackMultipleValues Timeout. Stopping listening...") 
+              logInfo("TrackMultipleValues Timeout. Stopping listening...") 
               keepAccepting = false 
             }
           }
-          println (System.currentTimeMillis + ": " +  "TrackMultipleValues:Got new request:" + clientSocket)
+          logInfo("TrackMultipleValues:Got new request:" + clientSocket)
           if (clientSocket != null) {
             try {            
               threadPool.execute (new Runnable {
@@ -506,14 +506,14 @@ private object BroadcastCS {
     
   }
   
-  class GuideMultipleRequests extends Thread {
+  class GuideMultipleRequests extends Thread with Logging {
     override def run = {
       var threadPool = Executors.newCachedThreadPool
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
       // listenPort = BroadcastCS.masterListenPort
-      println (System.currentTimeMillis + ": " +  "GuideMultipleRequests" + serverSocket + " " + listenPort)
+      logInfo("GuideMultipleRequests" + serverSocket + " " + listenPort)
       
       var keepAccepting = true
       try {
@@ -524,12 +524,12 @@ private object BroadcastCS {
             clientSocket = serverSocket.accept
           } catch {
             case e: Exception => { 
-              println ("GuideMultipleRequests Timeout. Stopping listening...") 
+              logInfo("GuideMultipleRequests Timeout. Stopping listening...") 
               keepAccepting = false 
             }
           }
           if (clientSocket != null) {
-            println (System.currentTimeMillis + ": " +  "Guide:Accepted new client connection:" + clientSocket)
+            logInfo("Guide:Accepted new client connection:" + clientSocket)
             try {            
               threadPool.execute (new GuideSingleRequest (clientSocket))
             } catch {
@@ -543,7 +543,8 @@ private object BroadcastCS {
       }
     }
     
-    class GuideSingleRequest (val clientSocket: Socket) extends Runnable {
+    class GuideSingleRequest (val clientSocket: Socket)
+    extends Runnable with Logging {
       private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
       private val ois = new ObjectInputStream (clientSocket.getInputStream)
 
@@ -552,21 +553,21 @@ private object BroadcastCS {
       
       def run = {
         try {
-          println (System.currentTimeMillis + ": " +  "new GuideSingleRequest is running")
+          logInfo("new GuideSingleRequest is running")
           // Connecting worker is sending in its hostAddress and listenPort it will 
           // be listening to. ReplicaID is 0 and other fields are invalid (-1)
           var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
           
           // Select a suitable source and send it back to the worker
           selectedSourceInfo = selectSuitableSource (sourceInfo)
-          println (System.currentTimeMillis + ": " +  "Sending selectedSourceInfo:" + selectedSourceInfo)
+          logInfo("Sending selectedSourceInfo:" + selectedSourceInfo)
           oos.writeObject (selectedSourceInfo)
           oos.flush
 
           // Add this new (if it can finish) source to the PQ of sources
           thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress, 
             sourceInfo.listenPort, totalBlocks, totalBytes, 0)  
-          println (System.currentTimeMillis + ": " +  "Adding possible new source to pqOfSources: " + thisWorkerInfo)    
+          logInfo("Adding possible new source to pqOfSources: " + thisWorkerInfo)    
           pqOfSources.synchronized {
             pqOfSources.add (thisWorkerInfo)
           }
@@ -642,14 +643,14 @@ private object BroadcastCS {
     }    
   }
 
-  class ServeMultipleRequests extends Thread {
+  class ServeMultipleRequests extends Thread with Logging {
     override def run = {
       var threadPool = Executors.newCachedThreadPool
       var serverSocket: ServerSocket = null
 
       serverSocket = new ServerSocket (0) 
       listenPort = serverSocket.getLocalPort
-      println (System.currentTimeMillis + ": " +  "ServeMultipleRequests" + serverSocket + " " + listenPort)
+      logInfo("ServeMultipleRequests" + serverSocket + " " + listenPort)
       
       listenPortLock.synchronized {
         listenPortLock.notifyAll
@@ -664,12 +665,12 @@ private object BroadcastCS {
             clientSocket = serverSocket.accept
           } catch {
             case e: Exception => { 
-              println ("ServeMultipleRequests Timeout. Stopping listening...") 
+              logInfo("ServeMultipleRequests Timeout. Stopping listening...") 
               keepAccepting = false 
             }
           }
           if (clientSocket != null) {
-            println (System.currentTimeMillis + ": " +  "Serve:Accepted new client connection:" + clientSocket)
+            logInfo("Serve:Accepted new client connection:" + clientSocket)
             try {            
               threadPool.execute (new ServeSingleRequest (clientSocket))
             } catch {
@@ -683,23 +684,24 @@ private object BroadcastCS {
       }
     }
     
-    class ServeSingleRequest (val clientSocket: Socket) extends Runnable {
+    class ServeSingleRequest (val clientSocket: Socket)
+    extends Runnable with Logging {
       private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
       private val ois = new ObjectInputStream (clientSocket.getInputStream)
       
       def run  = {
         try {
-          println (System.currentTimeMillis + ": " +  "new ServeSingleRequest is running")
+          logInfo("new ServeSingleRequest is running")
           sendObject
         } catch {
           // TODO: Need to add better exception handling here
           // If something went wrong, e.g., the worker at the other end died etc. 
           // then close everything up
           case e: Exception => { 
-            println (System.currentTimeMillis + ": " +  "ServeSingleRequest had a " + e)
+            logInfo("ServeSingleRequest had a " + e)
           }
         } finally {
-          println (System.currentTimeMillis + ": " +  "ServeSingleRequest is closing streams and sockets")
+          logInfo("ServeSingleRequest is closing streams and sockets")
           ois.close
           oos.close
           clientSocket.close
@@ -726,7 +728,7 @@ private object BroadcastCS {
           } catch {
             case e: Exception => { }
           }
-          println (System.currentTimeMillis + ": " +  "Send block: " + i + " " + arrayOfBlocks(i))
+          logInfo("Send block: " + i + " " + arrayOfBlocks(i))
         }
       }    
     } 
@@ -734,7 +736,7 @@ private object BroadcastCS {
   }
 }
 
-private object BroadcastCH {
+private object BroadcastCH extends Logging {
   val values = new MapMaker ().softValues ().makeMap[UUID, Any]
 
   private var initialized = false
diff --git a/src/scala/spark/ClosureCleaner.scala b/src/scala/spark/ClosureCleaner.scala
index 8037434c387ad0777774519fb33471e989f6813b..0e0b3954d4db9703da58957b19b5d0af18f3e6dc 100644
--- a/src/scala/spark/ClosureCleaner.scala
+++ b/src/scala/spark/ClosureCleaner.scala
@@ -8,7 +8,7 @@ import org.objectweb.asm.commons.EmptyVisitor
 import org.objectweb.asm.Opcodes._
 
 
-object ClosureCleaner {
+object ClosureCleaner extends Logging {
   private def getClassReader(cls: Class[_]): ClassReader = {
     new ClassReader(cls.getResourceAsStream(
       cls.getName.replaceFirst("^.*\\.", "") + ".class"))
@@ -72,13 +72,13 @@ object ClosureCleaner {
         val field = cls.getDeclaredField(fieldName)
         field.setAccessible(true)
         val value = field.get(obj)
-        //println("1: Setting " + fieldName + " on " + cls + " to " + value);
+        //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
         field.set(outer, value)
       }
     }
     
     if (outer != null) {
-      //println("2: Setting $outer on " + func.getClass + " to " + outer);
+      //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
       val field = func.getClass.getDeclaredField("$outer")
       field.setAccessible(true)
       field.set(func, outer)
@@ -101,7 +101,7 @@ object ClosureCleaner {
       val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
       val obj = newCtor.newInstance().asInstanceOf[AnyRef];
       if (outer != null) {
-        //println("3: Setting $outer on " + cls + " to " + outer);
+        //logInfo("3: Setting $outer on " + cls + " to " + outer);
         val field = cls.getDeclaredField("$outer")
         field.setAccessible(true)
         field.set(obj, outer)
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index 68f4cb8aae61cc2b53b93dae0331152b2eee157c..be73aae541d6822f2fdea62350aa09b1ee4e1a5f 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -5,10 +5,14 @@ import java.util.concurrent.{Executors, ExecutorService}
 import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver}
 import mesos.{TaskDescription, TaskState, TaskStatus}
 
-object Executor {
+/**
+ * The Mesos executor for Spark.
+ */
+object Executor extends Logging {
   def main(args: Array[String]) {
     System.loadLibrary("mesos")
 
+    // Create a new Executor implementation that will run our tasks
     val exec = new mesos.Executor() {
       var classLoader: ClassLoader = null
       var threadPool: ExecutorService = null
@@ -25,10 +29,10 @@ object Executor {
         // If the REPL is in use, create a ClassLoader that will be able to
         // read new classes defined by the REPL as the user types code
         classLoader = this.getClass.getClassLoader
-        val classDir = System.getProperty("spark.repl.current.classdir")
-        if (classDir != null) {
-          println("Using REPL classdir: " + classDir)
-          classLoader = new repl.ExecutorClassLoader(classDir, classLoader)
+        val classUri = System.getProperty("spark.repl.class.uri")
+        if (classUri != null) {
+          logInfo("Using REPL class URI: " + classUri)
+          classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
         }
         Thread.currentThread.setContextClassLoader(classLoader)
         
@@ -43,7 +47,7 @@ object Executor {
         val arg = desc.getArg
         threadPool.execute(new Runnable() {
           def run() = {
-            println("Running task ID " + taskId)
+            logInfo("Running task ID " + taskId)
             try {
               Accumulators.clear
               val task = Utils.deserialize[Task[Any]](arg, classLoader)
@@ -52,12 +56,11 @@ object Executor {
               val result = new TaskResult(value, accumUpdates)
               d.sendStatusUpdate(new TaskStatus(
                 taskId, TaskState.TASK_FINISHED, Utils.serialize(result)))
-              println("Finished task ID " + taskId)
+              logInfo("Finished task ID " + taskId)
             } catch {
               case e: Exception => {
                 // TODO: Handle errors in tasks less dramatically
-                System.err.println("Exception in task ID " + taskId + ":")
-                e.printStackTrace
+                logError("Exception in task ID " + taskId, e)
                 System.exit(1)
               }
             }
@@ -66,6 +69,7 @@ object Executor {
       }
     }
 
+    // Start it running and connect it to the slave
     new MesosExecutorDriver(exec).run()
   }
 }
diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala
index 595386fceb07a179fc7149ac43793ef2cff75017..886272a8edd14131f096b3ef2a233a901876db1c 100644
--- a/src/scala/spark/HdfsFile.scala
+++ b/src/scala/spark/HdfsFile.scala
@@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.Reporter
 @serializable class HdfsSplit(@transient s: InputSplit)
 extends Split { 
   val inputSplit = new SerializableWritable[InputSplit](s)
+  override def toString = inputSplit.toString
 }
 
 class HdfsTextFile(sc: SparkContext, path: String)
diff --git a/src/scala/spark/LocalScheduler.scala b/src/scala/spark/LocalScheduler.scala
index a3e0fae0653a2bf068b96f968d3c118e4d1ab523..20954a1224243b68565cdbb63d420656ab345dbd 100644
--- a/src/scala/spark/LocalScheduler.scala
+++ b/src/scala/spark/LocalScheduler.scala
@@ -4,8 +4,10 @@ import java.util.concurrent._
 
 import scala.collection.mutable.Map
 
-// A simple Scheduler implementation that runs tasks locally in a thread pool.
-private class LocalScheduler(threads: Int) extends Scheduler {
+/**
+ * A simple Scheduler implementation that runs tasks locally in a thread pool.
+ */
+private class LocalScheduler(threads: Int) extends Scheduler with Logging {
   var threadPool: ExecutorService =
     Executors.newFixedThreadPool(threads, DaemonThreadFactory)
   
@@ -20,25 +22,24 @@ private class LocalScheduler(threads: Int) extends Scheduler {
     for (i <- 0 until tasks.length) {
       futures(i) = threadPool.submit(new Callable[TaskResult[T]]() {
         def call(): TaskResult[T] = {
-          println("Running task " + i)
+          logInfo("Running task " + i)
           try {
             // Serialize and deserialize the task so that accumulators are
             // changed to thread-local ones; this adds a bit of unnecessary
             // overhead but matches how the Nexus Executor works
             Accumulators.clear
             val bytes = Utils.serialize(tasks(i))
-            println("Size of task " + i + " is " + bytes.size + " bytes")
+            logInfo("Size of task " + i + " is " + bytes.size + " bytes")
             val task = Utils.deserialize[Task[T]](
               bytes, currentThread.getContextClassLoader)
             val value = task.run
             val accumUpdates = Accumulators.values
-            println("Finished task " + i)
+            logInfo("Finished task " + i)
             new TaskResult[T](value, accumUpdates)
           } catch {
             case e: Exception => {
               // TODO: Do something nicer here
-              System.err.println("Exception in task " + i + ":")
-              e.printStackTrace()
+              logError("Exception in task " + i, e)
               System.exit(1)
               null
             }
@@ -58,7 +59,10 @@ private class LocalScheduler(threads: Int) extends Scheduler {
   override def numCores() = threads
 }
 
-// A ThreadFactory that creates daemon threads
+
+/**
+ * A ThreadFactory that creates daemon threads
+ */
 private object DaemonThreadFactory extends ThreadFactory {
   override def newThread(r: Runnable): Thread = {
     val t = new Thread(r);
diff --git a/src/scala/spark/Logging.scala b/src/scala/spark/Logging.scala
new file mode 100644
index 0000000000000000000000000000000000000000..2d1feebbb1ede64fe2899d6a9608b6477e965991
--- /dev/null
+++ b/src/scala/spark/Logging.scala
@@ -0,0 +1,49 @@
+package spark
+
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+/**
+ * Utility trait for classes that want to log data. Creates a SLF4J logger
+ * for the class and allows logging messages at different levels using
+ * methods that only evaluate parameters lazily if the log level is enabled.
+ */
+trait Logging {
+  // Make the log field transient so that objects with Logging can
+  // be serialized and used on another machine
+  @transient private var log_ : Logger = null
+
+  // Method to get or create the logger for this object
+  def log: Logger = {
+    if (log_ == null) {
+      var className = this.getClass().getName()
+      // Ignore trailing $'s in the class names for Scala objects
+      if (className.endsWith("$"))
+        className = className.substring(0, className.length - 1)
+      log_ = LoggerFactory.getLogger(className)
+    }
+    return log_
+  }
+
+  // Log methods that take only a String
+  def logInfo(msg: => String) = if (log.isInfoEnabled) log.info(msg)
+
+  def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg)
+
+  def logWarning(msg: => String) = if (log.isWarnEnabled) log.warn(msg)
+
+  def logError(msg: => String) = if (log.isErrorEnabled) log.error(msg)
+
+  // Log methods that take Throwables (Exceptions/Errors) too
+  def logInfo(msg: => String, throwable: Throwable) =
+    if (log.isInfoEnabled) log.info(msg)
+
+  def logDebug(msg: => String, throwable: Throwable) =
+    if (log.isDebugEnabled) log.debug(msg)
+
+  def logWarning(msg: => String, throwable: Throwable) =
+    if (log.isWarnEnabled) log.warn(msg, throwable)
+
+  def logError(msg: => String, throwable: Throwable) =
+    if (log.isErrorEnabled) log.error(msg, throwable)
+}
diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 2f1c7431c57a1ac20a40a00ce3fa44788859af79..6575b48e8b495485a1f9cada4da477eaa496d250 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -23,7 +23,7 @@ import mesos._
 //    all the offers to the ParallelOperation and have it load-balance.
 private class MesosScheduler(
   master: String, frameworkName: String, execArg: Array[Byte])
-extends NScheduler with spark.Scheduler
+extends NScheduler with spark.Scheduler with Logging
 {
   // Lock used by runTasks to ensure only one thread can be in it
   val runTasksMutex = new Object()
@@ -101,7 +101,7 @@ extends NScheduler with spark.Scheduler
   }
 
   override def registered(d: SchedulerDriver, frameworkId: String) {
-    println("Registered as framework ID " + frameworkId)
+    logInfo("Registered as framework ID " + frameworkId)
     registeredLock.synchronized {
       isRegistered = true
       registeredLock.notifyAll()
@@ -137,7 +137,7 @@ extends NScheduler with spark.Scheduler
                 case None => {}
               }
             } catch {
-              case e: Exception => e.printStackTrace
+              case e: Exception => logError("Exception in resourceOffer", e)
             }
           }
         }
@@ -157,11 +157,11 @@ extends NScheduler with spark.Scheduler
               activeOps(opId).statusUpdate(status)
             }
           case None =>
-            println("TID " + status.getTaskId + "already finished")
+            logInfo("TID " + status.getTaskId + " already finished")
         }
 
       } catch {
-        case e: Exception => e.printStackTrace
+        case e: Exception => logError("Exception in statusUpdate", e)
       }
     }
   }
@@ -173,12 +173,11 @@ extends NScheduler with spark.Scheduler
           try {
             activeOp.error(code, message)
           } catch {
-            case e: Exception => e.printStackTrace
+            case e: Exception => logError("Exception in error callback", e)
           }
         }
       } else {
-        val msg = "Mesos error: %s (error code: %d)".format(message, code)
-        System.err.println(msg)
+        logError("Mesos error: %s (error code: %d)".format(message, code))
         System.exit(1)
       }
     }
@@ -205,10 +204,10 @@ trait ParallelOperation {
 
 class SimpleParallelOperation[T: ClassManifest](
   sched: MesosScheduler, tasks: Array[Task[T]], val opId: Int)
-extends ParallelOperation
+extends ParallelOperation with Logging
 {
   // Maximum time to wait to run a task in a preferred location (in ms)
-  val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "1000").toLong
+  val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
 
   val callingThread = currentThread
   val numTasks = tasks.length
@@ -261,9 +260,11 @@ extends ParallelOperation
           val taskId = sched.newTaskId()
           sched.taskIdToOpId(taskId) = opId
           tidToIndex(taskId) = i
-          printf("Starting task %d as opId %d, TID %s on slave %s: %s (%s)",
-            i, opId, taskId, offer.getSlaveId, offer.getHost, 
-            if(checkPref) "preferred" else "non-preferred")
+          val preferred = if(checkPref) "preferred" else "non-preferred"
+          val message =
+            "Starting task %d as opId %d, TID %s on slave %s: %s (%s)".format(
+              i, opId, taskId, offer.getSlaveId, offer.getHost, preferred)
+          logInfo(message)
           tasks(i).markStarted(offer)
           launched(i) = true
           tasksLaunched += 1
@@ -273,7 +274,7 @@ extends ParallelOperation
           params.put("cpus", "" + desiredCpus)
           params.put("mem", "" + desiredMem)
           val serializedTask = Utils.serialize(tasks(i))
-          println("... Serialized size: " + serializedTask.size)
+          //logInfo("Serialized size: " + serializedTask.size)
           return Some(new TaskDescription(taskId, offer.getSlaveId,
             "task_" + taskId, params, serializedTask))
         }
@@ -298,37 +299,40 @@ extends ParallelOperation
 
   def taskFinished(status: TaskStatus) {
     val tid = status.getTaskId
-    print("Finished opId " + opId + " TID " + tid)
-    if (!finished(tidToIndex(tid))) {
+    val index = tidToIndex(tid)
+    if (!finished(index)) {
+      tasksFinished += 1
+      logInfo("Finished opId %d TID %d (progress: %d/%d)".format(
+        opId, tid, tasksFinished, numTasks))
       // Deserialize task result
       val result = Utils.deserialize[TaskResult[T]](status.getData)
-      results(tidToIndex(tid)) = result.value
+      results(index) = result.value
       // Update accumulators
       print(" with " + result.accumUpdates.size + " accumulatedUpdates")
       Accumulators.add(callingThread, result.accumUpdates)
       // Mark finished and stop if we've finished all the tasks
-      finished(tidToIndex(tid)) = true
+      finished(index) = true
       // Remove TID -> opId mapping from sched
       sched.taskIdToOpId.remove(tid)
-      tasksFinished += 1
-
-      println(", finished " + tasksFinished + "/" + numTasks)
       if (tasksFinished == numTasks)
         setAllFinished()
     } else {
-      printf("... Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+      logInfo("Ignoring task-finished event for TID " + tid +
+        " because task " + index + " is already finished")
     }
   }
 
   def taskLost(status: TaskStatus) {
     val tid = status.getTaskId
-    println("Lost opId " + opId + " TID " + tid)
-    if (!finished(tidToIndex(tid))) {
-      launched(tidToIndex(tid)) = false
+    val index = tidToIndex(tid)
+    if (!finished(index)) {
+      logInfo("Lost opId " + opId + " TID " + tid)
+      launched(index) = false
       sched.taskIdToOpId.remove(tid)
       tasksLaunched -= 1
     } else {
-      printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
+      logInfo("Ignoring task-lost event for TID " + tid +
+        " because task " + index + " is already finished")
     }
   }
 
diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala
index f8d1ab7f570cfade40d53f891c0911f5d8fca664..5236eb958ffb4ae9023deebef784a3ab7275b37c 100644
--- a/src/scala/spark/RDD.scala
+++ b/src/scala/spark/RDD.scala
@@ -7,6 +7,7 @@ import java.util.Random
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.Map
+import scala.collection.mutable.HashMap
 
 import mesos._
 
@@ -27,7 +28,12 @@ abstract class RDD[T: ClassManifest](
   def filter(f: T => Boolean) = new FilteredRDD(this, sc.clean(f))
   def aggregateSplit() = new SplitRDD(this)
   def cache() = new CachedRDD(this)
-  def sample(withReplacement: Boolean, frac: Double, seed: Int) = new SampledRDD(this, withReplacement, frac, seed)
+
+  def sample(withReplacement: Boolean, frac: Double, seed: Int) =
+    new SampledRDD(this, withReplacement, frac, seed)
+
+  def flatMap[U: ClassManifest](f: T => Traversable[U]) =
+    new FlatMappedRDD(this, sc.clean(f))
 
   def foreach(f: T => Unit) {
     val cleanF = sc.clean(f)
@@ -93,27 +99,27 @@ extends Task[U] {
 
 class ForeachTask[T: ClassManifest](
   rdd: RDD[T], split: Split, func: T => Unit)
-extends RDDTask[Unit, T](rdd, split) {
+extends RDDTask[Unit, T](rdd, split) with Logging {
   override def run() {
-    println("Processing " + split)
+    logInfo("Processing " + split)
     rdd.iterator(split).foreach(func)
   }
 }
 
 class CollectTask[T](
   rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
-extends RDDTask[Array[T], T](rdd, split) {
+extends RDDTask[Array[T], T](rdd, split) with Logging {
   override def run(): Array[T] = {
-    println("Processing " + split)
+    logInfo("Processing " + split)
     rdd.iterator(split).toArray(m)
   }
 }
 
 class ReduceTask[T: ClassManifest](
   rdd: RDD[T], split: Split, f: (T, T) => T)
-extends RDDTask[Option[T], T](rdd, split) {
+extends RDDTask[Option[T], T](rdd, split) with Logging {
   override def run(): Option[T] = {
-    println("Processing " + split)
+    logInfo("Processing " + split)
     val iter = rdd.iterator(split)
     if (iter.hasNext)
       Some(iter.reduceLeft(f))
@@ -140,6 +146,16 @@ extends RDD[T](prev.sparkContext) {
   override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
 }
 
+class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+  prev: RDD[T], f: T => Traversable[U]) 
+extends RDD[U](prev.sparkContext) {
+  override def splits = prev.splits
+  override def preferredLocations(split: Split) = prev.preferredLocations(split)
+  override def iterator(split: Split) =
+    prev.iterator(split).toStream.flatMap(f).iterator
+  override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
+}
+
 class SplitRDD[T: ClassManifest](prev: RDD[T]) 
 extends RDD[Array[T]](prev.sparkContext) {
   override def splits = prev.splits
@@ -149,7 +165,10 @@ extends RDD[Array[T]](prev.sparkContext) {
 }
 
 
-@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {}
+@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {
+  override def toString() =
+    "SeededSplit(" + prev.toString + ", seed " + seed + ")"
+}
 
 class SampledRDD[T: ClassManifest](
   prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) 
@@ -183,7 +202,7 @@ extends RDD[T](prev.sparkContext) {
 
 class CachedRDD[T](
   prev: RDD[T])(implicit m: ClassManifest[T])
-extends RDD[T](prev.sparkContext) {
+extends RDD[T](prev.sparkContext) with Logging {
   val id = CachedRDD.newId()
   @transient val cacheLocs = Map[Split, List[String]]()
 
@@ -198,6 +217,7 @@ extends RDD[T](prev.sparkContext) {
   
   override def iterator(split: Split): Iterator[T] = {
     val key = id + "::" + split.toString
+    logInfo("CachedRDD split key is " + key)
     val cache = CachedRDD.cache
     val loading = CachedRDD.loading
     val cachedVal = cache.get(key)
@@ -217,7 +237,7 @@ extends RDD[T](prev.sparkContext) {
         }
       }
       // If we got here, we have to load the split
-      println("Loading and caching " + split)
+      logInfo("Loading and caching " + split)
       val array = prev.iterator(split).toArray(m)
       cache.put(key, array)
       loading.synchronized {
@@ -251,6 +271,7 @@ private object CachedRDD {
 abstract class UnionSplit[T: ClassManifest] extends Split {
   def iterator(): Iterator[T]
   def preferredLocations(): Seq[String]
+  def toString(): String
 }
 
 @serializable
@@ -259,6 +280,8 @@ class UnionSplitImpl[T: ClassManifest](
 extends UnionSplit[T] {
   override def iterator() = rdd.iterator(split)
   override def preferredLocations() = rdd.preferredLocations(split)
+  override def toString() =
+    "UnionSplitImpl(" + split.toString + ")"
 }
 
 @serializable
@@ -280,7 +303,10 @@ extends RDD[T](sc) {
     s.asInstanceOf[UnionSplit[T]].preferredLocations()
 }
 
-@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {}
+@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split {
+  override def toString() =
+    "CartesianSplit(" + s1.toString + ", " + s2.toString + ")"
+}
 
 @serializable
 class CartesianRDD[T: ClassManifest, U:ClassManifest](
@@ -309,3 +335,18 @@ extends RDD[Pair[T, U]](sc) {
     rdd2.taskStarted(currSplit.s2, slot)
   }
 }
+
+@serializable class PairRDDExtras[K, V](rdd: RDD[(K, V)]) {
+  def reduceByKey(func: (V, V) => V): Map[K, V] = {
+    def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
+      for ((k, v) <- m2) {
+        m1.get(k) match {
+          case None => m1(k) = v
+          case Some(w) => m1(k) = func(w, v)
+        }
+      }
+      return m1
+    }
+    rdd.map(pair => HashMap(pair)).reduce(mergeMaps)
+  }
+}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index c26032bb4f3d7f13a6ac518c50a5c74e807ab506..62e49271bfdcd2ce143030294c0cf96f80e3b778 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -6,7 +6,7 @@ import java.util.UUID
 import scala.collection.mutable.ArrayBuffer
 import scala.actors.Actor._
 
-class SparkContext(master: String, frameworkName: String) {
+class SparkContext(master: String, frameworkName: String) extends Logging {
   Broadcast.initialize(true)
 
   def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int) =
@@ -56,10 +56,10 @@ class SparkContext(master: String, frameworkName: String) {
 
   private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]])
       : Array[T] = {
-    println("Running " + tasks.length + " tasks in parallel")
+    logInfo("Running " + tasks.length + " tasks in parallel")
     val start = System.nanoTime
     val result = scheduler.runTasks(tasks.toArray)
-    println("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
+    logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
     return result
   }
 
@@ -85,9 +85,14 @@ object SparkContext {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
     def zero(initialValue: Double) = 0.0
   }
+
   implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
     def addInPlace(t1: Int, t2: Int): Int = t1 + t2
     def zero(initialValue: Int) = 0
   }
+
   // TODO: Add AccumulatorParams for other types, e.g. lists and strings
+
+  implicit def rddToPairRDDExtras[K, V](rdd: RDD[(K, V)]) =
+    new PairRDDExtras(rdd)
 }
diff --git a/src/scala/spark/Utils.scala b/src/scala/spark/Utils.scala
index 52bcb89f003fc226c93c8c74d6e2d9bc36efa49f..27d73aefbd69420f0e6aca4ed33e7cb339676cf7 100644
--- a/src/scala/spark/Utils.scala
+++ b/src/scala/spark/Utils.scala
@@ -2,7 +2,9 @@ package spark
 
 import java.io._
 
-private object Utils {
+import scala.collection.mutable.ArrayBuffer
+
+object Utils {
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream
     val oos = new ObjectOutputStream(bos)
@@ -25,4 +27,27 @@ private object Utils {
     }
     return ois.readObject.asInstanceOf[T]
   }
+
+  def isAlpha(c: Char) = {
+    (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
+  }
+
+  def splitWords(s: String): Seq[String] = {
+    val buf = new ArrayBuffer[String]
+    var i = 0
+    while (i < s.length) {
+      var j = i
+      while (j < s.length && isAlpha(s.charAt(j))) {
+        j += 1
+      }
+      if (j > i) {
+        buf += s.substring(i, j);
+      }
+      i = j
+      while (i < s.length && !isAlpha(s.charAt(i))) {
+        i += 1
+      }
+    }
+    return buf
+  }
 }
diff --git a/src/scala/spark/repl/ClassServer.scala b/src/scala/spark/repl/ClassServer.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6a40d92765b922cd9ca72fd9d91e13a7667288ad
--- /dev/null
+++ b/src/scala/spark/repl/ClassServer.scala
@@ -0,0 +1,77 @@
+package spark.repl
+
+import java.io.File
+import java.net.InetAddress
+
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.handler.DefaultHandler
+import org.eclipse.jetty.server.handler.HandlerList
+import org.eclipse.jetty.server.handler.ResourceHandler
+
+import spark.Logging
+
+
+/**
+ * Exception type thrown by ClassServer when it is in the wrong state 
+ * for an operation.
+ */
+class ServerStateException(message: String) extends Exception(message)
+
+
+/**
+ * An HTTP server used by the interpreter to allow worker nodes to access
+ * class files created as the user types in lines of code. This is just a
+ * wrapper around a Jetty embedded HTTP server.
+ */
+class ClassServer(classDir: File) extends Logging {
+  private var server: Server = null
+  private var port: Int = -1
+
+  def start() {
+    if (server != null) {
+      throw new ServerStateException("Server is already started")
+    } else {
+      server = new Server(0)
+      val resHandler = new ResourceHandler
+      resHandler.setResourceBase(classDir.getAbsolutePath)
+      val handlerList = new HandlerList
+      handlerList.setHandlers(Array(resHandler, new DefaultHandler))
+      server.setHandler(handlerList)
+      server.start()
+      port = server.getConnectors()(0).getLocalPort()
+      logDebug("ClassServer started at " + uri)
+    }
+  }
+
+  def stop() {
+    if (server == null) {
+      throw new ServerStateException("Server is already stopped")
+    } else {
+      server.stop()
+      port = -1
+      server = null
+    }
+  }
+
+  /**
+   * Get the URI of this HTTP server (http://host:port)
+   */
+  def uri: String = {
+    if (server == null) {
+      throw new ServerStateException("Server is not started")
+    } else {
+      return "http://" + getLocalIpAddress + ":" + port
+    }
+  }
+
+  /**
+   * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4)
+   */
+  private def getLocalIpAddress: String = {
+    // Get local IP as an array of four bytes
+    val bytes = InetAddress.getLocalHost().getAddress()
+    // Convert the bytes to ints (keeping in mind that they may be negative)
+    // and join them into a string
+    return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
+  }
+}
diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala
index 7d91b20e792e1c0290cfc9433268768ddf77410a..13d81ec1cf096befaff8b7e9901b783faa17212e 100644
--- a/src/scala/spark/repl/ExecutorClassLoader.scala
+++ b/src/scala/spark/repl/ExecutorClassLoader.scala
@@ -1,7 +1,7 @@
 package spark.repl
 
 import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLClassLoader}
+import java.net.{URI, URL, URLClassLoader, URLEncoder}
 import java.util.concurrent.{Executors, ExecutorService}
 
 import org.apache.hadoop.conf.Configuration
@@ -12,18 +12,34 @@ import org.objectweb.asm.commons.EmptyVisitor
 import org.objectweb.asm.Opcodes._
 
 
-// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load
-// classes defined by the interpreter when the REPL is in use
-class ExecutorClassLoader(classDir: String, parent: ClassLoader)
+/**
+ * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
+ * used to load classes defined by the interpreter when the REPL is used
+ */
+class ExecutorClassLoader(classUri: String, parent: ClassLoader)
 extends ClassLoader(parent) {
-  val fileSystem = FileSystem.get(new URI(classDir), new Configuration())
-  val directory = new URI(classDir).getPath
+  val uri = new URI(classUri)
+  val directory = uri.getPath
+
+  // Hadoop FileSystem object for our URI, if it isn't using HTTP
+  var fileSystem: FileSystem = {
+    if (uri.getScheme() == "http")
+      null
+    else
+      FileSystem.get(uri, new Configuration())
+  }
   
   override def findClass(name: String): Class[_] = {
     try {
-      //println("repl.ExecutorClassLoader resolving " + name)
-      val path = new Path(directory, name.replace('.', '/') + ".class")
-      val bytes = readAndTransformClass(name, fileSystem.open(path))
+      val pathInDirectory = name.replace('.', '/') + ".class"
+      val inputStream = {
+        if (fileSystem != null)
+          fileSystem.open(new Path(directory, pathInDirectory))
+        else
+          new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+      }
+      val bytes = readAndTransformClass(name, inputStream)
+      inputStream.close()
       return defineClass(name, bytes, 0, bytes.length)
     } catch {
       case e: Exception => throw new ClassNotFoundException(name, e)
@@ -57,6 +73,13 @@ extends ClassLoader(parent) {
       return bos.toByteArray
     }
   }
+
+  /**
+   * URL-encode a string, preserving only slashes
+   */
+  def urlEncode(str: String): String = {
+    str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
+  }
 }
 
 class ConstructorCleaner(className: String, cv: ClassVisitor)
@@ -68,7 +91,6 @@ extends ClassAdapter(cv) {
       // This is the constructor, time to clean it; just output some new
       // instructions to mv that create the object and set the static MODULE$
       // field in the class to point to it, but do nothing otherwise.
-      //println("Cleaning constructor of " + className)
       mv.visitCode()
       mv.visitVarInsn(ALOAD, 0) // load this
       mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala
index 2bfaaf73421c8fcbcd0f19d00adcc2142be36687..ae2e7e8a681a5d9c466c8e0db804ad9c1052aa6d 100644
--- a/src/scala/spark/repl/SparkInterpreter.scala
+++ b/src/scala/spark/repl/SparkInterpreter.scala
@@ -90,31 +90,44 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
   
   val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
 
-  /** directory to save .class files to */
-  //val virtualDirectory = new VirtualDirectory("(memory)", None)
-  val virtualDirectory = {
+  /** Local directory to save .class files too */
+  val outputDir = {
     val rootDir = new File(System.getProperty("spark.repl.classdir",
                            System.getProperty("java.io.tmpdir")))
     var attempts = 0
     val maxAttempts = 10
-    var outputDir: File = null
-    while (outputDir == null) {
+    var dir: File = null
+    while (dir == null) {
       attempts += 1
       if (attempts > maxAttempts) {
         throw new IOException("Failed to create a temp directory " +
                               "after " + maxAttempts + " attempts!")
       }
       try {
-        outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
-        if (outputDir.exists() || !outputDir.mkdirs())
-          outputDir = null
+        dir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
+        if (dir.exists() || !dir.mkdirs())
+          dir = null
       } catch { case e: IOException => ; }
     }
-    System.setProperty("spark.repl.current.classdir",
-      "file://" + outputDir.getAbsolutePath + "/")
-    if (SPARK_DEBUG_REPL)
-      println("Output directory: " + outputDir)
-    new PlainFile(outputDir)
+    if (SPARK_DEBUG_REPL) {
+      println("Output directory: " + dir)
+    }
+    dir
+  }
+
+  /** Scala compiler virtual directory for outputDir */
+  //val virtualDirectory = new VirtualDirectory("(memory)", None)
+  val virtualDirectory = new PlainFile(outputDir)
+
+  /** Jetty server that will serve our classes to worker nodes */
+  val classServer = new ClassServer(outputDir)
+
+  // Start the classServer and store its URI in a spark system property
+  // (which will be passed to executors so that they can connect to it)
+  classServer.start()
+  System.setProperty("spark.repl.class.uri", classServer.uri)
+  if (SPARK_DEBUG_REPL) {
+    println("ClassServer started, URI = " + classServer.uri)
   }
   
   /** reporter */
@@ -714,6 +727,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
    */
   def close() {
     reporter.flush
+    classServer.stop()
   }
 
   /** A traverser that finds all mentioned identifiers, i.e. things
diff --git a/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar b/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
new file mode 100644
index 0000000000000000000000000000000000000000..3f9d847618bcec9c81f36ad6ee76a0f53816987f
Binary files /dev/null and b/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar differ
diff --git a/third_party/hadoop-0.20.0/lib/slf4j-api-1.4.3.jar b/third_party/hadoop-0.20.0/lib/slf4j-api-1.4.3.jar
deleted file mode 100644
index b34fe8d82de073b1c14316c3228bb221fa623ae2..0000000000000000000000000000000000000000
Binary files a/third_party/hadoop-0.20.0/lib/slf4j-api-1.4.3.jar and /dev/null differ
diff --git a/third_party/hadoop-0.20.0/lib/slf4j-log4j12-1.4.3.jar b/third_party/hadoop-0.20.0/lib/slf4j-log4j12-1.4.3.jar
deleted file mode 100644
index 70082fc382bed73c3eb699b988e2ea1de0c95aac..0000000000000000000000000000000000000000
Binary files a/third_party/hadoop-0.20.0/lib/slf4j-log4j12-1.4.3.jar and /dev/null differ
diff --git a/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar b/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
new file mode 100644
index 0000000000000000000000000000000000000000..d9ef50be6d1c2f767a5cb9491a983e1100667365
Binary files /dev/null and b/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar differ
diff --git a/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar b/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
new file mode 100644
index 0000000000000000000000000000000000000000..fb5249346847105c26d30da1048d8e2c364e7d6f
Binary files /dev/null and b/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar differ
diff --git a/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar b/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
new file mode 100644
index 0000000000000000000000000000000000000000..42e0ad0de7773da9b94b12f503deda7f5a506015
Binary files /dev/null and b/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar differ
diff --git a/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar b/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
new file mode 100644
index 0000000000000000000000000000000000000000..873d11983e18b71aeafbc3d805495d6842339812
Binary files /dev/null and b/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar differ