From 91ec5a1a04339983d57a72d8df8f1d769d8d855a Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Tue, 18 Jun 2013 15:13:12 -0700 Subject: [PATCH] Changing JSON protocol and removing spray code --- .../scala/spark/deploy/JsonProtocol.scala | 114 ++++++++---------- .../spark/deploy/master/MasterWebUI.scala | 6 +- .../spark/deploy/worker/WorkerWebUI.scala | 6 +- .../scala/spark/storage/BlockManagerUI.scala | 11 +- .../src/main/scala/spark/util/AkkaUtils.scala | 15 +-- core/src/main/scala/spark/util/WebUI.scala | 12 +- project/SparkBuild.scala | 7 +- project/plugins.sbt | 2 - 8 files changed, 66 insertions(+), 107 deletions(-) diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala index ea832101d2..b4365d31e9 100644 --- a/core/src/main/scala/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -1,79 +1,65 @@ package spark.deploy import master.{ApplicationInfo, WorkerInfo} +import net.liftweb.json.JsonDSL._ import worker.ExecutorRunner -import cc.spray.json._ -/** - * spray-json helper class containing implicit conversion to json for marshalling responses - */ -private[spark] object JsonProtocol extends DefaultJsonProtocol { - implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] { - def write(obj: WorkerInfo) = JsObject( - "id" -> JsString(obj.id), - "host" -> JsString(obj.host), - "port" -> JsNumber(obj.port), - "webuiaddress" -> JsString(obj.webUiAddress), - "cores" -> JsNumber(obj.cores), - "coresused" -> JsNumber(obj.coresUsed), - "memory" -> JsNumber(obj.memory), - "memoryused" -> JsNumber(obj.memoryUsed) - ) - } +object JsonProtocol { + def writeWorkerInfo(obj: WorkerInfo) = { + ("id" -> obj.id) ~ + ("host" -> obj.host) ~ + ("port" -> obj.port) ~ + ("webuiaddress" -> obj.webUiAddress) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) + } - implicit object AppInfoJsonFormat extends RootJsonWriter[ApplicationInfo] { - def write(obj: ApplicationInfo) = JsObject( - "starttime" -> JsNumber(obj.startTime), - "id" -> JsString(obj.id), - "name" -> JsString(obj.desc.name), - "cores" -> JsNumber(obj.desc.maxCores), - "user" -> JsString(obj.desc.user), - "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave), - "submitdate" -> JsString(obj.submitDate.toString)) + def writeApplicationInfo(obj: ApplicationInfo) = { + ("starttime" -> obj.startTime) ~ + ("id" -> obj.id) ~ + ("name" -> obj.desc.name) ~ + ("cores" -> obj.desc.maxCores) ~ + ("user" -> obj.desc.user) ~ + ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("submitdate" -> obj.submitDate.toString) } - implicit object AppDescriptionJsonFormat extends RootJsonWriter[ApplicationDescription] { - def write(obj: ApplicationDescription) = JsObject( - "name" -> JsString(obj.name), - "cores" -> JsNumber(obj.maxCores), - "memoryperslave" -> JsNumber(obj.memoryPerSlave), - "user" -> JsString(obj.user) - ) + def writeApplicationDescription(obj: ApplicationDescription) = { + ("name" -> obj.name) ~ + ("cores" -> obj.maxCores) ~ + ("memoryperslave" -> obj.memoryPerSlave) ~ + ("user" -> obj.user) } - implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] { - def write(obj: ExecutorRunner) = JsObject( - "id" -> JsNumber(obj.execId), - "memory" -> JsNumber(obj.memory), - "appid" -> JsString(obj.appId), - "appdesc" -> obj.appDesc.toJson.asJsObject - ) + def writeExecutorRunner(obj: ExecutorRunner) = { + ("id" -> obj.execId) ~ + ("memory" -> obj.memory) ~ + ("appid" -> obj.appId) ~ + ("appdesc" -> writeApplicationDescription(obj.appDesc)) } - implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] { - def write(obj: MasterState) = JsObject( - "url" -> JsString("spark://" + obj.uri), - "workers" -> JsArray(obj.workers.toList.map(_.toJson)), - "cores" -> JsNumber(obj.workers.map(_.cores).sum), - "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum), - "memory" -> JsNumber(obj.workers.map(_.memory).sum), - "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum), - "activeapps" -> JsArray(obj.activeApps.toList.map(_.toJson)), - "completedapps" -> JsArray(obj.completedApps.toList.map(_.toJson)) - ) + def writeMasterState(obj: MasterState) = { + ("url" -> ("spark://" + obj.uri)) ~ + ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ + ("cores" -> obj.workers.map(_.cores).sum) ~ + ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ + ("memory" -> obj.workers.map(_.memory).sum) ~ + ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ + ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ + ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) } - implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] { - def write(obj: WorkerState) = JsObject( - "id" -> JsString(obj.workerId), - "masterurl" -> JsString(obj.masterUrl), - "masterwebuiurl" -> JsString(obj.masterWebUiUrl), - "cores" -> JsNumber(obj.cores), - "coresused" -> JsNumber(obj.coresUsed), - "memory" -> JsNumber(obj.memory), - "memoryused" -> JsNumber(obj.memoryUsed), - "executors" -> JsArray(obj.executors.toList.map(_.toJson)), - "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson)) - ) + def writeWorkerState(obj: WorkerState) = { + ("id" -> obj.workerId) ~ + ("masterurl" -> obj.masterUrl) ~ + ("masterwebuiurl" -> obj.masterWebUiUrl) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) ~ + ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~ + ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner)) } -} +} \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index a2e9dfd762..6623142d69 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -55,9 +55,9 @@ class MasterWebUI(master: ActorRef) extends Logging { <div class="row"> <div class="span12"> <ul class="unstyled"> - <li><strong>ID:</strong> app.id</li> - <li><strong>Description:</strong> app.desc.name</li> - <li><strong>User:</strong> app.desc.user</li> + <li><strong>ID:</strong> {app.id}</li> + <li><strong>Description:</strong> {app.desc.name}</li> + <li><strong>User:</strong> {app.desc.user}</li> <li><strong>Cores:</strong> { if (app.desc.maxCores == Integer.MAX_VALUE) { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index b8b4b89738..0af9eb8efa 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -1,16 +1,12 @@ package spark.deploy.worker -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.ActorRef import akka.dispatch.Await import akka.pattern.ask import akka.util.{Duration, Timeout} import akka.util.duration._ -import cc.spray.Directives -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.http.MediaTypes import spark.deploy.{WorkerState, RequestWorkerState} -import spark.deploy.JsonProtocol._ import java.io.File import spark.util.{WebUI => UtilsWebUI} import spark.{Utils, Logging} diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 6ac4398de7..e9c362fce7 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -3,18 +3,12 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} import akka.util.Duration import akka.util.duration._ -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.Directives import spark.{Logging, SparkContext} -import spark.util.AkkaUtils import spark.Utils import spark.util.WebUI -import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, ResourceHandler} import org.eclipse.jetty.server.Handler -import javax.servlet.http.{HttpServletResponse, HttpServletRequest} -import xml.Elem +import javax.servlet.http.HttpServletRequest import xml.Node -import java.net.URLClassLoader import spark.util.WebUI._ @@ -23,7 +17,7 @@ import spark.util.WebUI._ */ private[spark] class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext) - extends Directives with Logging { + extends Logging { implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") val host = Utils.localHostName() @@ -55,7 +49,6 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) val content = <div class="row"> diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index bd2d637ae7..134c912c46 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -1,21 +1,10 @@ package spark.util -import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem} +import akka.actor.{ActorSystemImpl, ActorSystem} import com.typesafe.config.ConfigFactory import akka.util.duration._ -import akka.pattern.ask import akka.remote.RemoteActorRefProvider -import cc.spray.Route -import cc.spray.io.IoWorker -import cc.spray.{SprayCanRootService, HttpService} -import cc.spray.can.server.HttpServer -import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler -import akka.dispatch.Await -import spark.{Utils, SparkException} -import java.util.concurrent.TimeoutException -import org.eclipse.jetty.server.Server -import org.eclipse.jetty.server.Handler -import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler} + /** * Various utility classes for working with Akka. diff --git a/core/src/main/scala/spark/util/WebUI.scala b/core/src/main/scala/spark/util/WebUI.scala index 34b776f1d8..e6b39b15eb 100644 --- a/core/src/main/scala/spark/util/WebUI.scala +++ b/core/src/main/scala/spark/util/WebUI.scala @@ -1,23 +1,22 @@ package spark.util -import xml.Elem import xml.Node -import util.parsing.json.{JSONFormat, JSONObject} import org.eclipse.jetty.server.{Server, Request, Handler} import javax.servlet.http.{HttpServletResponse, HttpServletRequest} -import org.eclipse.jetty.util.component.LifeCycle.Listener import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} import util.Try import util.Success import util.Failure import spark.Logging import annotation.tailrec +import net.liftweb.json.JsonAST.JValue +import net.liftweb.json._ object WebUI extends Logging { type Responder[T] = HttpServletRequest => T - implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler = - createHandler(responder, "text/json") + implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler = + createHandler(responder, "text/json", (in: JValue) => pretty(render(in))) implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = createHandler(responder, "text/html") @@ -25,7 +24,8 @@ object WebUI extends Logging { implicit def textResponderToHandler(responder: Responder[String]): Handler = createHandler(responder, "text/plain") - def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = { + def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, + extractFn: T => String = (in: Any) => in.toString): Handler = { new AbstractHandler { def handle(target: String, baseRequest: Request, diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index faf6e2ae8e..ec26b2a229 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -4,7 +4,6 @@ import sbt.Classpaths.publishTask import Keys._ import sbtassembly.Plugin._ import AssemblyKeys._ -import twirl.sbt.TwirlPlugin._ // For Sonatype publishing //import com.jsuereth.pgp.sbtplugin.PgpKeys._ @@ -157,9 +156,7 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", - "cc.spray" % "spray-can" % "1.0-M2.1" excludeAll(excludeNetty), - "cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty), - "cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty), + "net.liftweb" % "lift-json_2.9.2" % "2.5", "org.apache.mesos" % "mesos" % "0.9.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test" @@ -189,7 +186,7 @@ object SparkBuild extends Build { "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" } ) } - ) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings + ) ++ assemblySettings ++ extraAssemblySettings def rootSettings = sharedSettings ++ Seq( publish := {} diff --git a/project/plugins.sbt b/project/plugins.sbt index f806e66481..1b0f879b94 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -10,8 +10,6 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0") -addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1") - // For Sonatype publishing //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) -- GitLab