Skip to content
Snippets Groups Projects
Commit 91ec5a1a authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Changing JSON protocol and removing spray code

parent fc94576e
No related branches found
No related tags found
No related merge requests found
package spark.deploy package spark.deploy
import master.{ApplicationInfo, WorkerInfo} import master.{ApplicationInfo, WorkerInfo}
import net.liftweb.json.JsonDSL._
import worker.ExecutorRunner import worker.ExecutorRunner
import cc.spray.json._
/** object JsonProtocol {
* spray-json helper class containing implicit conversion to json for marshalling responses def writeWorkerInfo(obj: WorkerInfo) = {
*/ ("id" -> obj.id) ~
private[spark] object JsonProtocol extends DefaultJsonProtocol { ("host" -> obj.host) ~
implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] { ("port" -> obj.port) ~
def write(obj: WorkerInfo) = JsObject( ("webuiaddress" -> obj.webUiAddress) ~
"id" -> JsString(obj.id), ("cores" -> obj.cores) ~
"host" -> JsString(obj.host), ("coresused" -> obj.coresUsed) ~
"port" -> JsNumber(obj.port), ("memory" -> obj.memory) ~
"webuiaddress" -> JsString(obj.webUiAddress), ("memoryused" -> obj.memoryUsed)
"cores" -> JsNumber(obj.cores), }
"coresused" -> JsNumber(obj.coresUsed),
"memory" -> JsNumber(obj.memory),
"memoryused" -> JsNumber(obj.memoryUsed)
)
}
implicit object AppInfoJsonFormat extends RootJsonWriter[ApplicationInfo] { def writeApplicationInfo(obj: ApplicationInfo) = {
def write(obj: ApplicationInfo) = JsObject( ("starttime" -> obj.startTime) ~
"starttime" -> JsNumber(obj.startTime), ("id" -> obj.id) ~
"id" -> JsString(obj.id), ("name" -> obj.desc.name) ~
"name" -> JsString(obj.desc.name), ("cores" -> obj.desc.maxCores) ~
"cores" -> JsNumber(obj.desc.maxCores), ("user" -> obj.desc.user) ~
"user" -> JsString(obj.desc.user), ("memoryperslave" -> obj.desc.memoryPerSlave) ~
"memoryperslave" -> JsNumber(obj.desc.memoryPerSlave), ("submitdate" -> obj.submitDate.toString)
"submitdate" -> JsString(obj.submitDate.toString))
} }
implicit object AppDescriptionJsonFormat extends RootJsonWriter[ApplicationDescription] { def writeApplicationDescription(obj: ApplicationDescription) = {
def write(obj: ApplicationDescription) = JsObject( ("name" -> obj.name) ~
"name" -> JsString(obj.name), ("cores" -> obj.maxCores) ~
"cores" -> JsNumber(obj.maxCores), ("memoryperslave" -> obj.memoryPerSlave) ~
"memoryperslave" -> JsNumber(obj.memoryPerSlave), ("user" -> obj.user)
"user" -> JsString(obj.user)
)
} }
implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] { def writeExecutorRunner(obj: ExecutorRunner) = {
def write(obj: ExecutorRunner) = JsObject( ("id" -> obj.execId) ~
"id" -> JsNumber(obj.execId), ("memory" -> obj.memory) ~
"memory" -> JsNumber(obj.memory), ("appid" -> obj.appId) ~
"appid" -> JsString(obj.appId), ("appdesc" -> writeApplicationDescription(obj.appDesc))
"appdesc" -> obj.appDesc.toJson.asJsObject
)
} }
implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] { def writeMasterState(obj: MasterState) = {
def write(obj: MasterState) = JsObject( ("url" -> ("spark://" + obj.uri)) ~
"url" -> JsString("spark://" + obj.uri), ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
"workers" -> JsArray(obj.workers.toList.map(_.toJson)), ("cores" -> obj.workers.map(_.cores).sum) ~
"cores" -> JsNumber(obj.workers.map(_.cores).sum), ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
"coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum), ("memory" -> obj.workers.map(_.memory).sum) ~
"memory" -> JsNumber(obj.workers.map(_.memory).sum), ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
"memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum), ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
"activeapps" -> JsArray(obj.activeApps.toList.map(_.toJson)), ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
"completedapps" -> JsArray(obj.completedApps.toList.map(_.toJson))
)
} }
implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] { def writeWorkerState(obj: WorkerState) = {
def write(obj: WorkerState) = JsObject( ("id" -> obj.workerId) ~
"id" -> JsString(obj.workerId), ("masterurl" -> obj.masterUrl) ~
"masterurl" -> JsString(obj.masterUrl), ("masterwebuiurl" -> obj.masterWebUiUrl) ~
"masterwebuiurl" -> JsString(obj.masterWebUiUrl), ("cores" -> obj.cores) ~
"cores" -> JsNumber(obj.cores), ("coresused" -> obj.coresUsed) ~
"coresused" -> JsNumber(obj.coresUsed), ("memory" -> obj.memory) ~
"memory" -> JsNumber(obj.memory), ("memoryused" -> obj.memoryUsed) ~
"memoryused" -> JsNumber(obj.memoryUsed), ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
"executors" -> JsArray(obj.executors.toList.map(_.toJson)), ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
"finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson))
)
} }
} }
\ No newline at end of file
...@@ -55,9 +55,9 @@ class MasterWebUI(master: ActorRef) extends Logging { ...@@ -55,9 +55,9 @@ class MasterWebUI(master: ActorRef) extends Logging {
<div class="row"> <div class="row">
<div class="span12"> <div class="span12">
<ul class="unstyled"> <ul class="unstyled">
<li><strong>ID:</strong> app.id</li> <li><strong>ID:</strong> {app.id}</li>
<li><strong>Description:</strong> app.desc.name</li> <li><strong>Description:</strong> {app.desc.name}</li>
<li><strong>User:</strong> app.desc.user</li> <li><strong>User:</strong> {app.desc.user}</li>
<li><strong>Cores:</strong> <li><strong>Cores:</strong>
{ {
if (app.desc.maxCores == Integer.MAX_VALUE) { if (app.desc.maxCores == Integer.MAX_VALUE) {
......
package spark.deploy.worker package spark.deploy.worker
import akka.actor.{ActorRef, ActorSystem} import akka.actor.ActorRef
import akka.dispatch.Await import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.util.{Duration, Timeout} import akka.util.{Duration, Timeout}
import akka.util.duration._ import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.http.MediaTypes
import spark.deploy.{WorkerState, RequestWorkerState} import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
import java.io.File import java.io.File
import spark.util.{WebUI => UtilsWebUI} import spark.util.{WebUI => UtilsWebUI}
import spark.{Utils, Logging} import spark.{Utils, Logging}
......
...@@ -3,18 +3,12 @@ package spark.storage ...@@ -3,18 +3,12 @@ package spark.storage
import akka.actor.{ActorRef, ActorSystem} import akka.actor.{ActorRef, ActorSystem}
import akka.util.Duration import akka.util.Duration
import akka.util.duration._ import akka.util.duration._
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
import spark.{Logging, SparkContext} import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils import spark.Utils
import spark.util.WebUI import spark.util.WebUI
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, ResourceHandler}
import org.eclipse.jetty.server.Handler import org.eclipse.jetty.server.Handler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.HttpServletRequest
import xml.Elem
import xml.Node import xml.Node
import java.net.URLClassLoader
import spark.util.WebUI._ import spark.util.WebUI._
...@@ -23,7 +17,7 @@ import spark.util.WebUI._ ...@@ -23,7 +17,7 @@ import spark.util.WebUI._
*/ */
private[spark] private[spark]
class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext) class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
extends Directives with Logging { extends Logging {
implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName() val host = Utils.localHostName()
...@@ -55,7 +49,6 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, ...@@ -55,7 +49,6 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
val filteredStorageStatusList = StorageUtils. val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix) filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
val content = val content =
<div class="row"> <div class="row">
......
package spark.util package spark.util
import akka.actor.{ActorRef, Props, ActorSystemImpl, ActorSystem} import akka.actor.{ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.util.duration._ import akka.util.duration._
import akka.pattern.ask
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import cc.spray.Route
import cc.spray.io.IoWorker
import cc.spray.{SprayCanRootService, HttpService}
import cc.spray.can.server.HttpServer
import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler
import akka.dispatch.Await
import spark.{Utils, SparkException}
import java.util.concurrent.TimeoutException
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler}
/** /**
* Various utility classes for working with Akka. * Various utility classes for working with Akka.
......
package spark.util package spark.util
import xml.Elem
import xml.Node import xml.Node
import util.parsing.json.{JSONFormat, JSONObject}
import org.eclipse.jetty.server.{Server, Request, Handler} import org.eclipse.jetty.server.{Server, Request, Handler}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import org.eclipse.jetty.util.component.LifeCycle.Listener
import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
import util.Try import util.Try
import util.Success import util.Success
import util.Failure import util.Failure
import spark.Logging import spark.Logging
import annotation.tailrec import annotation.tailrec
import net.liftweb.json.JsonAST.JValue
import net.liftweb.json._
object WebUI extends Logging { object WebUI extends Logging {
type Responder[T] = HttpServletRequest => T type Responder[T] = HttpServletRequest => T
implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler = implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
createHandler(responder, "text/json") createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html") createHandler(responder, "text/html")
...@@ -25,7 +24,8 @@ object WebUI extends Logging { ...@@ -25,7 +24,8 @@ object WebUI extends Logging {
implicit def textResponderToHandler(responder: Responder[String]): Handler = implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain") createHandler(responder, "text/plain")
def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = { def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
extractFn: T => String = (in: Any) => in.toString): Handler = {
new AbstractHandler { new AbstractHandler {
def handle(target: String, def handle(target: String,
baseRequest: Request, baseRequest: Request,
......
...@@ -4,7 +4,6 @@ import sbt.Classpaths.publishTask ...@@ -4,7 +4,6 @@ import sbt.Classpaths.publishTask
import Keys._ import Keys._
import sbtassembly.Plugin._ import sbtassembly.Plugin._
import AssemblyKeys._ import AssemblyKeys._
import twirl.sbt.TwirlPlugin._
// For Sonatype publishing // For Sonatype publishing
//import com.jsuereth.pgp.sbtplugin.PgpKeys._ //import com.jsuereth.pgp.sbtplugin.PgpKeys._
...@@ -157,9 +156,7 @@ object SparkBuild extends Build { ...@@ -157,9 +156,7 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty), "com.typesafe.akka" % "akka-slf4j" % "2.0.3" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4", "it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0", "colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1" excludeAll(excludeNetty), "net.liftweb" % "lift-json_2.9.2" % "2.5",
"cc.spray" % "spray-server" % "1.0-M2.1" excludeAll(excludeNetty),
"cc.spray" % "spray-json_2.9.2" % "1.1.1" excludeAll(excludeNetty),
"org.apache.mesos" % "mesos" % "0.9.0-incubating", "org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2", "io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test" "org.apache.derby" % "derby" % "10.4.2.0" % "test"
...@@ -189,7 +186,7 @@ object SparkBuild extends Build { ...@@ -189,7 +186,7 @@ object SparkBuild extends Build {
"src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
} ) } )
} }
) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings ) ++ assemblySettings ++ extraAssemblySettings
def rootSettings = sharedSettings ++ Seq( def rootSettings = sharedSettings ++ Seq(
publish := {} publish := {}
......
...@@ -10,8 +10,6 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1") ...@@ -10,8 +10,6 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1")
// For Sonatype publishing // For Sonatype publishing
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns) //resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment