Skip to content
Snippets Groups Projects
Commit ea542d29 authored by Xingbo Jiang's avatar Xingbo Jiang Committed by gatorsmile
Browse files

[SPARK-19824][CORE] Update JsonProtocol to keep consistent with the UI

## What changes were proposed in this pull request?

Fix any inconsistent part in JsonProtocol with the UI.
This PR also contains the modifications in #17181

## How was this patch tested?

Updated JsonProtocolSuite.

Before this change, localhost:8080/json shows:
```
{
  "url" : "spark://xingbos-MBP.local:7077",
  "workers" : [ {
    "id" : "worker-20170615172946-192.168.0.101-49450",
    "host" : "192.168.0.101",
    "port" : 49450,
    "webuiaddress" : "http://192.168.0.101:8081",
    "cores" : 8,
    "coresused" : 8,
    "coresfree" : 0,
    "memory" : 15360,
    "memoryused" : 1024,
    "memoryfree" : 14336,
    "state" : "ALIVE",
    "lastheartbeat" : 1497519481722
  }, {
    "id" : "worker-20170615172948-192.168.0.101-49452",
    "host" : "192.168.0.101",
    "port" : 49452,
    "webuiaddress" : "http://192.168.0.101:8082",
    "cores" : 8,
    "coresused" : 8,
    "coresfree" : 0,
    "memory" : 15360,
    "memoryused" : 1024,
    "memoryfree" : 14336,
    "state" : "ALIVE",
    "lastheartbeat" : 1497519484160
  }, {
    "id" : "worker-20170615172951-192.168.0.101-49469",
    "host" : "192.168.0.101",
    "port" : 49469,
    "webuiaddress" : "http://192.168.0.101:8083",
    "cores" : 8,
    "coresused" : 8,
    "coresfree" : 0,
    "memory" : 15360,
    "memoryused" : 1024,
    "memoryfree" : 14336,
    "state" : "ALIVE",
    "lastheartbeat" : 1497519486905
  } ],
  "cores" : 24,
  "coresused" : 24,
  "memory" : 46080,
  "memoryused" : 3072,
  "activeapps" : [ {
    "starttime" : 1497519426990,
    "id" : "app-20170615173706-0001",
    "name" : "Spark shell",
    "user" : "xingbojiang",
    "memoryperslave" : 1024,
    "submitdate" : "Thu Jun 15 17:37:06 CST 2017",
    "state" : "RUNNING",
    "duration" : 65362
  } ],
  "completedapps" : [ {
    "starttime" : 1497519250893,
    "id" : "app-20170615173410-0000",
    "name" : "Spark shell",
    "user" : "xingbojiang",
    "memoryperslave" : 1024,
    "submitdate" : "Thu Jun 15 17:34:10 CST 2017",
    "state" : "FINISHED",
    "duration" : 116895
  } ],
  "activedrivers" : [ ],
  "status" : "ALIVE"
}
```

After the change:
```
{
  "url" : "spark://xingbos-MBP.local:7077",
  "workers" : [ {
    "id" : "worker-20170615175032-192.168.0.101-49951",
    "host" : "192.168.0.101",
    "port" : 49951,
    "webuiaddress" : "http://192.168.0.101:8081",
    "cores" : 8,
    "coresused" : 8,
    "coresfree" : 0,
    "memory" : 15360,
    "memoryused" : 1024,
    "memoryfree" : 14336,
    "state" : "ALIVE",
    "lastheartbeat" : 1497520292900
  }, {
    "id" : "worker-20170615175034-192.168.0.101-49953",
    "host" : "192.168.0.101",
    "port" : 49953,
    "webuiaddress" : "http://192.168.0.101:8082",
    "cores" : 8,
    "coresused" : 8,
    "coresfree" : 0,
    "memory" : 15360,
    "memoryused" : 1024,
    "memoryfree" : 14336,
    "state" : "ALIVE",
    "lastheartbeat" : 1497520280301
  }, {
    "id" : "worker-20170615175037-192.168.0.101-49955",
    "host" : "192.168.0.101",
    "port" : 49955,
    "webuiaddress" : "http://192.168.0.101:8083",
    "cores" : 8,
    "coresused" : 8,
    "coresfree" : 0,
    "memory" : 15360,
    "memoryused" : 1024,
    "memoryfree" : 14336,
    "state" : "ALIVE",
    "lastheartbeat" : 1497520282884
  } ],
  "aliveworkers" : 3,
  "cores" : 24,
  "coresused" : 24,
  "memory" : 46080,
  "memoryused" : 3072,
  "activeapps" : [ {
    "id" : "app-20170615175122-0001",
    "starttime" : 1497520282115,
    "name" : "Spark shell",
    "cores" : 24,
    "user" : "xingbojiang",
    "memoryperslave" : 1024,
    "submitdate" : "Thu Jun 15 17:51:22 CST 2017",
    "state" : "RUNNING",
    "duration" : 10805
  } ],
  "completedapps" : [ {
    "id" : "app-20170615175058-0000",
    "starttime" : 1497520258766,
    "name" : "Spark shell",
    "cores" : 24,
    "user" : "xingbojiang",
    "memoryperslave" : 1024,
    "submitdate" : "Thu Jun 15 17:50:58 CST 2017",
    "state" : "FINISHED",
    "duration" : 9876
  } ],
  "activedrivers" : [ ],
  "completeddrivers" : [ ],
  "status" : "ALIVE"
}
```

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18303 from jiangxb1987/json-protocol.
parent 112bd9bf
No related branches found
No related tags found
No related merge requests found
......@@ -21,30 +21,65 @@ import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master._
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
private[deploy] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo): JObject = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("memoryfree" -> obj.memoryFree) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
}
/**
* Export the [[WorkerInfo]] to a Json object. A [[WorkerInfo]] consists of the information of a
* worker.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the worker
* `host` the host that the worker is running on
* `port` the port that the worker is bound to
* `webuiaddress` the address used in web UI
* `cores` total cores of the worker
* `coresused` allocated cores of the worker
* `coresfree` free cores of the worker
* `memory` total memory of the worker
* `memoryused` allocated memory of the worker
* `memoryfree` free memory of the worker
* `state` state of the worker, see [[WorkerState]]
* `lastheartbeat` time in milliseconds that the latest heart beat message from the
* worker is received
*/
def writeWorkerInfo(obj: WorkerInfo): JObject = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("memoryfree" -> obj.memoryFree) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
}
/**
* Export the [[ApplicationInfo]] to a Json objec. An [[ApplicationInfo]] consists of the
* information of an application.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the application
* `starttime` time in milliseconds that the application starts
* `name` the description of the application
* `cores` total cores granted to the application
* `user` name of the user who submitted the application
* `memoryperslave` minimal memory in MB required to each executor
* `submitdate` time in Date that the application is submitted
* `state` state of the application, see [[ApplicationState]]
* `duration` time in milliseconds that the application has been running
*/
def writeApplicationInfo(obj: ApplicationInfo): JObject = {
("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
("starttime" -> obj.startTime) ~
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("cores" -> obj.coresGranted) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("submitdate" -> obj.submitDate.toString) ~
......@@ -52,14 +87,36 @@ private[deploy] object JsonProtocol {
("duration" -> obj.duration)
}
/**
* Export the [[ApplicationDescription]] to a Json object. An [[ApplicationDescription]] consists
* of the description of an application.
*
* @return a Json object containing the following fields:
* `name` the description of the application
* `cores` max cores that can be allocated to the application, 0 means unlimited
* `memoryperslave` minimal memory in MB required to each executor
* `user` name of the user who submitted the application
* `command` the command string used to submit the application
*/
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
("cores" -> obj.maxCores.getOrElse(0)) ~
("memoryperslave" -> obj.memoryPerExecutorMB) ~
("user" -> obj.user) ~
("command" -> obj.command.toString)
}
/**
* Export the [[ExecutorRunner]] to a Json object. An [[ExecutorRunner]] consists of the
* information of an executor.
*
* @return a Json object containing the following fields:
* `id` an integer identifier of the executor
* `memory` memory in MB allocated to the executor
* `appid` a string identifier of the application that the executor is working on
* `appdesc` a Json object of the [[ApplicationDescription]] of the application that the
* executor is working on
*/
def writeExecutorRunner(obj: ExecutorRunner): JObject = {
("id" -> obj.execId) ~
("memory" -> obj.memory) ~
......@@ -67,18 +124,59 @@ private[deploy] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
/**
* Export the [[DriverInfo]] to a Json object. A [[DriverInfo]] consists of the information of a
* driver.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the driver
* `starttime` time in milliseconds that the driver starts
* `state` state of the driver, see [[DriverState]]
* `cores` cores allocated to the driver
* `memory` memory in MB allocated to the driver
* `submitdate` time in Date that the driver is created
* `worker` identifier of the worker that the driver is running on
* `mainclass` main class of the command string that started the driver
*/
def writeDriverInfo(obj: DriverInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
("memory" -> obj.desc.mem)
("memory" -> obj.desc.mem) ~
("submitdate" -> obj.submitDate.toString) ~
("worker" -> obj.worker.map(_.id).getOrElse("None")) ~
("mainclass" -> obj.desc.command.arguments(2))
}
/**
* Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the
* information of a master node.
*
* @return a Json object containing the following fields:
* `url` the url of the master node
* `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the
* master
* `aliveworkers` size of alive workers allocated to the master
* `cores` total cores available of the master
* `coresused` cores used by the master
* `memory` total memory available of the master
* `memoryused` memory used by the master
* `activeapps` a list of Json objects of [[ApplicationInfo]] of the active applications
* running on the master
* `completedapps` a list of Json objects of [[ApplicationInfo]] of the applications
* completed in the master
* `activedrivers` a list of Json objects of [[DriverInfo]] of the active drivers of the
* master
* `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers
* of the master
* `status` status of the master, see [[MasterState]]
*/
def writeMasterState(obj: MasterStateResponse): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("aliveworkers" -> aliveWorkers.length) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
......@@ -86,9 +184,27 @@ private[deploy] object JsonProtocol {
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
}
/**
* Export the [[WorkerStateResponse]] to a Json object. A [[WorkerStateResponse]] consists the
* information of a worker node.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the worker node
* `masterurl` url of the master node of the worker
* `masterwebuiurl` the address used in web UI of the master node of the worker
* `cores` total cores of the worker
* `coreused` used cores of the worker
* `memory` total memory of the worker
* `memoryused` used memory of the worker
* `executors` a list of Json objects of [[ExecutorRunner]] of the executors running on
* the worker
* `finishedexecutors` a list of Json objects of [[ExecutorRunner]] of the finished
* executors of the worker
*/
def writeWorkerState(obj: WorkerStateResponse): JObject = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
......@@ -97,7 +213,7 @@ private[deploy] object JsonProtocol {
("coresused" -> obj.coresUsed) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
("executors" -> obj.executors.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner))
}
}
......@@ -39,7 +39,7 @@ private[deploy] object DeployTestUtils {
}
def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
"org.apache.spark.FakeClass", Seq("WORKER_URL", "USER_JAR", "mainClass"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)
......@@ -47,7 +47,7 @@ private[deploy] object DeployTestUtils {
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())
createDriverDesc(), JsonConstants.submitDate)
def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80")
......
......@@ -104,8 +104,8 @@ object JsonConstants {
val submitDate = new Date(123456789)
val appInfoJsonStr =
"""
|{"starttime":3,"id":"id","name":"name",
|"cores":4,"user":"%s",
|{"id":"id","starttime":3,"name":"name",
|"cores":0,"user":"%s",
|"memoryperslave":1234,"submitdate":"%s",
|"state":"WAITING","duration":%d}
""".format(System.getProperty("user.name", "<unknown>"),
......@@ -134,19 +134,24 @@ object JsonConstants {
val driverInfoJsonStr =
"""
|{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100}
""".stripMargin
|{"id":"driver-3","starttime":"3",
|"state":"SUBMITTED","cores":3,"memory":100,
|"submitdate":"%s","worker":"None",
|"mainclass":"mainClass"}
""".format(submitDate.toString).stripMargin
val masterStateJsonStr =
"""
|{"url":"spark://host:8080",
|"workers":[%s,%s],
|"aliveworkers":2,
|"cores":8,"coresused":0,"memory":2468,"memoryused":0,
|"activeapps":[%s],"completedapps":[],
|"activedrivers":[%s],
|"completeddrivers":[%s],
|"status":"ALIVE"}
""".format(workerInfoJsonStr, workerInfoJsonStr,
appInfoJsonStr, driverInfoJsonStr).stripMargin
appInfoJsonStr, driverInfoJsonStr, driverInfoJsonStr).stripMargin
val workerStateJsonStr =
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment