diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f14f804b3ab4c10756b8771e2e8df651c7ec3210
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -0,0 +1,57 @@
+package spark.deploy
+
+import master.{JobInfo, WorkerInfo}
+import cc.spray.json._
+
+/**
+ * spray-json helper class containing implicit conversion to json for marshalling responses
+ */
+private[spark] object JsonProtocol extends DefaultJsonProtocol {
+  implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] {
+    def write(obj: WorkerInfo) = JsObject(
+      "id" -> JsString(obj.id),
+      "host" -> JsString(obj.host),
+      "webuiaddress" -> JsString(obj.webUiAddress),
+      "cores" -> JsNumber(obj.cores),
+      "coresused" -> JsNumber(obj.coresUsed),
+      "memory" -> JsNumber(obj.memory),
+      "memoryused" -> JsNumber(obj.memoryUsed)
+    )
+  }
+
+  implicit object JobInfoJsonFormat extends RootJsonWriter[JobInfo] {
+    def write(obj: JobInfo) = JsObject(
+      "starttime" -> JsNumber(obj.startTime),
+      "id" -> JsString(obj.id),
+      "name" -> JsString(obj.desc.name),
+      "cores" -> JsNumber(obj.desc.cores),
+      "user" -> JsString(obj.desc.user),
+      "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave),
+      "submitdate" -> JsString(obj.submitDate.toString))
+  }
+
+  implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] {
+    def write(obj: MasterState) = JsObject(
+      "url" -> JsString("spark://" + obj.uri),
+      "workers" -> JsArray(obj.workers.toList.map(_.toJson)),
+      "cores" -> JsNumber(obj.workers.map(_.cores).sum),
+      "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
+      "memory" -> JsNumber(obj.workers.map(_.memory).sum),
+      "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
+      "activejobs" -> JsArray(obj.activeJobs.toList.map(_.toJson)),
+      "completedjobs" -> JsArray(obj.completedJobs.toList.map(_.toJson))
+    )
+  }
+
+  implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] {
+    def write(obj: WorkerState) = JsObject(
+      "id" -> JsString(obj.workerId),
+      "masterurl" -> JsString(obj.masterUrl),
+      "masterwebuiurl" -> JsString(obj.masterWebUiUrl),
+      "cores" -> JsNumber(obj.cores),
+      "coresused" -> JsNumber(obj.coresUsed),
+      "memory" -> JsNumber(obj.memory),
+      "memoryused" -> JsNumber(obj.memoryUsed)
+    )
+  }
+}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 3cdd3721f56cfb5cc6de88d51b9c48ffcc50315c..a96b55d6f32ca2f9773f5f74a94641afb689f1f8 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -9,6 +9,9 @@ import cc.spray.Directives
 import cc.spray.directives._
 import cc.spray.typeconversion.TwirlSupport._
 import spark.deploy._
+import cc.spray.http.MediaTypes
+import JsonProtocol._
+import cc.spray.typeconversion.SprayJsonSupport._
 
 private[spark]
 class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
@@ -19,13 +22,19 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
   
   val handler = {
     get {
-      path("") {
-        completeWith {
+      (path("") & parameters('format ?)) {
+        case Some(js) if js.equalsIgnoreCase("json") =>
           val future = master ? RequestMasterState
-          future.map { 
-            masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+          respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+            ctx.complete(future.mapTo[MasterState])
+          }
+        case _ =>
+          completeWith {
+            val future = master ? RequestMasterState
+            future.map {
+              masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+            }
           }
-        }
       } ~
       path("job") {
         parameter("jobId") { jobId =>
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index d06f4884ee63ca08dd764d78bddab51f7613a677..84b6c16bd6279694bdb389f08383139608d98786 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -7,7 +7,10 @@ import akka.util.Timeout
 import akka.util.duration._
 import cc.spray.Directives
 import cc.spray.typeconversion.TwirlSupport._
-import spark.deploy.{WorkerState, RequestWorkerState}
+import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState}
+import cc.spray.http.MediaTypes
+import JsonProtocol._
+import cc.spray.typeconversion.SprayJsonSupport._
 
 private[spark]
 class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
@@ -18,13 +21,20 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
   
   val handler = {
     get {
-      path("") {
-        completeWith{
+      (path("") & parameters('format ?)) {
+        case Some(js) if js.equalsIgnoreCase("json") => {
           val future = worker ? RequestWorkerState
-          future.map { workerState =>
-            spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+          respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+            ctx.complete(future.mapTo[WorkerState])
           }
         }
+        case _ =>
+          completeWith{
+            val future = worker ? RequestWorkerState
+            future.map { workerState =>
+              spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+            }
+          }
       } ~
       path("log") {
         parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 80262ab7b4bd709326058a40b3e90875ed56d9ee..c193bf7c8d37ed748b7d40ca8c58565410abdd87 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -135,8 +135,11 @@ extends Connection(SocketChannel.open, selector_) {
           val chunk = message.getChunkForSending(defaultChunkSize)
           if (chunk.isDefined) {
             messages += message  // this is probably incorrect, it wont work as fifo
-            if (!message.started) logDebug("Starting to send [" + message + "]")
-            message.started = true
+            if (!message.started) {
+              logDebug("Starting to send [" + message + "]")
+              message.started = true
+              message.startTime = System.currentTimeMillis
+            }
             return chunk 
           } else {
             /*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 642fa4b525031c4f37c89f16fca9a261990baec4..36c01ad629bbb938ca9b381748105b3db2f50c54 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
   }
   
   val selector = SelectorProvider.provider.openSelector()
-  val handleMessageExecutor = Executors.newFixedThreadPool(4) 
+  val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt)
   val serverChannel = ServerSocketChannel.open()
   val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] 
   val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
   val messageStatuses = new HashMap[Int, MessageStatus] 
-  val connectionRequests = new SynchronizedQueue[SendingConnection]
+  val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
   val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
   val sendMessageRequests = new Queue[(Message, SendingConnection)]
 
@@ -79,10 +79,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
   def run() {
     try {
       while(!selectorThread.isInterrupted) {
-        while(!connectionRequests.isEmpty) {
-          val sendingConnection = connectionRequests.dequeue
+        for( (connectionManagerId, sendingConnection) <- connectionRequests) {
           sendingConnection.connect() 
           addConnection(sendingConnection)
+          connectionRequests -= connectionManagerId
         }
         sendMessageRequests.synchronized {
           while(!sendMessageRequests.isEmpty) {
@@ -300,8 +300,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
   private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
     def startNewConnection(): SendingConnection = {
       val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
-      val newConnection = new SendingConnection(inetSocketAddress, selector)
-      connectionRequests += newConnection
+      val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
       newConnection   
     }
     val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
@@ -473,6 +472,7 @@ private[spark] object ConnectionManager {
       val mb = size * count / 1024.0 / 1024.0
       val ms = finishTime - startTime
       val tput = mb * 1000.0 / ms
+      println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
       println("--------------------------")
       println()
     }
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 47ceaf3c0753488bd9be9c4831045de3892a68c9..533e4610f3a4c88c3b6f4b2dbb6503893a5cf25b 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -13,8 +13,14 @@ import akka.util.duration._
 
 private[spark] object ConnectionManagerTest extends Logging{
   def main(args: Array[String]) {
+    //<mesos cluster> - the master URL
+    //<slaves file> - a list slaves to run connectionTest on
+    //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
+    //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
+    //[count] - how many times to run, default is 3
+    //[await time in seconds] : await time (in seconds), default is 600
     if (args.length < 2) {
-      println("Usage: ConnectionManagerTest <mesos cluster> <slaves file>")
+      println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
       System.exit(1)
     }
     
@@ -29,16 +35,19 @@ private[spark] object ConnectionManagerTest extends Logging{
 
     /*println("Slaves")*/
     /*slaves.foreach(println)*/
-   
-    val slaveConnManagerIds = sc.parallelize(0 until slaves.length, slaves.length).map(
+    val tasknum = if (args.length > 2) args(2).toInt else slaves.length
+    val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 
+    val count = if (args.length > 4) args(4).toInt else 3
+    val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
+    println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
+    val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
         i => SparkEnv.get.connectionManager.id).collect()
     println("\nSlave ConnectionManagerIds")
     slaveConnManagerIds.foreach(println)
     println
 
-    val count = 10
     (0 until count).foreach(i => {
-      val resultStrs = sc.parallelize(0 until slaves.length, slaves.length).map(i => {
+      val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
         val connManager = SparkEnv.get.connectionManager
         val thisConnManagerId = connManager.id 
         connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { 
@@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{
           None
         })
 
-        val size =  100 * 1024  * 1024 
         val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
         buffer.flip
         
@@ -56,13 +64,13 @@ private[spark] object ConnectionManagerTest extends Logging{
           logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
           connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
         })
-        val results = futures.map(f => Await.result(f, 1.second))
+        val results = futures.map(f => Await.result(f, awaitTime))
         val finishTime = System.currentTimeMillis
         Thread.sleep(5000)
         
         val mb = size * results.size / 1024.0 / 1024.0
         val ms = finishTime - startTime
-        val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+        val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
         logInfo(resultStr)
         resultStr
       }).collect()
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 7c7c33131a3a73b7107e50955e0e2ffe97df6657..219674028eb7d2884986a706c121217f924a3869 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -38,6 +38,7 @@ object SparkBuild extends Build {
     scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
     retrieveManaged := true,
+    retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
     transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
     testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
 
@@ -133,6 +134,7 @@ object SparkBuild extends Build {
       "colt" % "colt" % "1.2.0",
       "cc.spray" % "spray-can" % "1.0-M2.1",
       "cc.spray" % "spray-server" % "1.0-M2.1",
+      "cc.spray" %%  "spray-json" % "1.1.1",
       "org.apache.mesos" % "mesos" % "0.9.0-incubating"
     ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
     unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
diff --git a/run b/run
index 1528f8353404a67431ad42c2a305af9500eb7693..6cfe9631afeff15849872ddf7856189d229050ce 100755
--- a/run
+++ b/run
@@ -75,16 +75,10 @@ CLASSPATH+=":$CORE_DIR/src/main/resources"
 CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
 CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
 if [ -e "$FWDIR/lib_managed" ]; then
-  for jar in `find "$FWDIR/lib_managed/jars" -name '*jar'`; do
-    CLASSPATH+=":$jar"
-  done
-  for jar in `find "$FWDIR/lib_managed/bundles" -name '*jar'`; do
-    CLASSPATH+=":$jar"
-  done
+  CLASSPATH+=":$FWDIR/lib_managed/jars/*"
+  CLASSPATH+=":$FWDIR/lib_managed/bundles/*"
 fi
-for jar in `find "$REPL_DIR/lib" -name '*jar'`; do
-  CLASSPATH+=":$jar"
-done
+CLASSPATH+=":$REPL_DIR/lib/*"
 for jar in `find "$REPL_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
   CLASSPATH+=":$jar"
 done