diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 220b20bf7cbd166d4e8f4eabdf3e2963860376b4..721269616657039995981f63005757a165b3ee37 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -21,30 +21,65 @@ import org.json4s.JsonAST.JObject import org.json4s.JsonDSL._ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} +import org.apache.spark.deploy.master._ +import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner private[deploy] object JsonProtocol { - def writeWorkerInfo(obj: WorkerInfo): JObject = { - ("id" -> obj.id) ~ - ("host" -> obj.host) ~ - ("port" -> obj.port) ~ - ("webuiaddress" -> obj.webUiAddress) ~ - ("cores" -> obj.cores) ~ - ("coresused" -> obj.coresUsed) ~ - ("coresfree" -> obj.coresFree) ~ - ("memory" -> obj.memory) ~ - ("memoryused" -> obj.memoryUsed) ~ - ("memoryfree" -> obj.memoryFree) ~ - ("state" -> obj.state.toString) ~ - ("lastheartbeat" -> obj.lastHeartbeat) - } + /** + * Export the [[WorkerInfo]] to a Json object. A [[WorkerInfo]] consists of the information of a + * worker. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the worker + * `host` the host that the worker is running on + * `port` the port that the worker is bound to + * `webuiaddress` the address used in web UI + * `cores` total cores of the worker + * `coresused` allocated cores of the worker + * `coresfree` free cores of the worker + * `memory` total memory of the worker + * `memoryused` allocated memory of the worker + * `memoryfree` free memory of the worker + * `state` state of the worker, see [[WorkerState]] + * `lastheartbeat` time in milliseconds that the latest heart beat message from the + * worker is received + */ + def writeWorkerInfo(obj: WorkerInfo): JObject = { + ("id" -> obj.id) ~ + ("host" -> obj.host) ~ + ("port" -> obj.port) ~ + ("webuiaddress" -> obj.webUiAddress) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("coresfree" -> obj.coresFree) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) ~ + ("memoryfree" -> obj.memoryFree) ~ + ("state" -> obj.state.toString) ~ + ("lastheartbeat" -> obj.lastHeartbeat) + } + /** + * Export the [[ApplicationInfo]] to a Json objec. An [[ApplicationInfo]] consists of the + * information of an application. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the application + * `starttime` time in milliseconds that the application starts + * `name` the description of the application + * `cores` total cores granted to the application + * `user` name of the user who submitted the application + * `memoryperslave` minimal memory in MB required to each executor + * `submitdate` time in Date that the application is submitted + * `state` state of the application, see [[ApplicationState]] + * `duration` time in milliseconds that the application has been running + */ def writeApplicationInfo(obj: ApplicationInfo): JObject = { - ("starttime" -> obj.startTime) ~ ("id" -> obj.id) ~ + ("starttime" -> obj.startTime) ~ ("name" -> obj.desc.name) ~ - ("cores" -> obj.desc.maxCores) ~ + ("cores" -> obj.coresGranted) ~ ("user" -> obj.desc.user) ~ ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~ ("submitdate" -> obj.submitDate.toString) ~ @@ -52,14 +87,36 @@ private[deploy] object JsonProtocol { ("duration" -> obj.duration) } + /** + * Export the [[ApplicationDescription]] to a Json object. An [[ApplicationDescription]] consists + * of the description of an application. + * + * @return a Json object containing the following fields: + * `name` the description of the application + * `cores` max cores that can be allocated to the application, 0 means unlimited + * `memoryperslave` minimal memory in MB required to each executor + * `user` name of the user who submitted the application + * `command` the command string used to submit the application + */ def writeApplicationDescription(obj: ApplicationDescription): JObject = { ("name" -> obj.name) ~ - ("cores" -> obj.maxCores) ~ + ("cores" -> obj.maxCores.getOrElse(0)) ~ ("memoryperslave" -> obj.memoryPerExecutorMB) ~ ("user" -> obj.user) ~ ("command" -> obj.command.toString) } + /** + * Export the [[ExecutorRunner]] to a Json object. An [[ExecutorRunner]] consists of the + * information of an executor. + * + * @return a Json object containing the following fields: + * `id` an integer identifier of the executor + * `memory` memory in MB allocated to the executor + * `appid` a string identifier of the application that the executor is working on + * `appdesc` a Json object of the [[ApplicationDescription]] of the application that the + * executor is working on + */ def writeExecutorRunner(obj: ExecutorRunner): JObject = { ("id" -> obj.execId) ~ ("memory" -> obj.memory) ~ @@ -67,18 +124,59 @@ private[deploy] object JsonProtocol { ("appdesc" -> writeApplicationDescription(obj.appDesc)) } + /** + * Export the [[DriverInfo]] to a Json object. A [[DriverInfo]] consists of the information of a + * driver. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the driver + * `starttime` time in milliseconds that the driver starts + * `state` state of the driver, see [[DriverState]] + * `cores` cores allocated to the driver + * `memory` memory in MB allocated to the driver + * `submitdate` time in Date that the driver is created + * `worker` identifier of the worker that the driver is running on + * `mainclass` main class of the command string that started the driver + */ def writeDriverInfo(obj: DriverInfo): JObject = { ("id" -> obj.id) ~ ("starttime" -> obj.startTime.toString) ~ ("state" -> obj.state.toString) ~ ("cores" -> obj.desc.cores) ~ - ("memory" -> obj.desc.mem) + ("memory" -> obj.desc.mem) ~ + ("submitdate" -> obj.submitDate.toString) ~ + ("worker" -> obj.worker.map(_.id).getOrElse("None")) ~ + ("mainclass" -> obj.desc.command.arguments(2)) } + /** + * Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the + * information of a master node. + * + * @return a Json object containing the following fields: + * `url` the url of the master node + * `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the + * master + * `aliveworkers` size of alive workers allocated to the master + * `cores` total cores available of the master + * `coresused` cores used by the master + * `memory` total memory available of the master + * `memoryused` memory used by the master + * `activeapps` a list of Json objects of [[ApplicationInfo]] of the active applications + * running on the master + * `completedapps` a list of Json objects of [[ApplicationInfo]] of the applications + * completed in the master + * `activedrivers` a list of Json objects of [[DriverInfo]] of the active drivers of the + * master + * `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers + * of the master + * `status` status of the master, see [[MasterState]] + */ def writeMasterState(obj: MasterStateResponse): JObject = { val aliveWorkers = obj.workers.filter(_.isAlive()) ("url" -> obj.uri) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ + ("aliveworkers" -> aliveWorkers.length) ~ ("cores" -> aliveWorkers.map(_.cores).sum) ~ ("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~ ("memory" -> aliveWorkers.map(_.memory).sum) ~ @@ -86,9 +184,27 @@ private[deploy] object JsonProtocol { ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~ + ("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~ ("status" -> obj.status.toString) } + /** + * Export the [[WorkerStateResponse]] to a Json object. A [[WorkerStateResponse]] consists the + * information of a worker node. + * + * @return a Json object containing the following fields: + * `id` a string identifier of the worker node + * `masterurl` url of the master node of the worker + * `masterwebuiurl` the address used in web UI of the master node of the worker + * `cores` total cores of the worker + * `coreused` used cores of the worker + * `memory` total memory of the worker + * `memoryused` used memory of the worker + * `executors` a list of Json objects of [[ExecutorRunner]] of the executors running on + * the worker + * `finishedexecutors` a list of Json objects of [[ExecutorRunner]] of the finished + * executors of the worker + */ def writeWorkerState(obj: WorkerStateResponse): JObject = { ("id" -> obj.workerId) ~ ("masterurl" -> obj.masterUrl) ~ @@ -97,7 +213,7 @@ private[deploy] object JsonProtocol { ("coresused" -> obj.coresUsed) ~ ("memory" -> obj.memory) ~ ("memoryused" -> obj.memoryUsed) ~ - ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~ - ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner)) + ("executors" -> obj.executors.map(writeExecutorRunner)) ~ + ("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 9c13c15281a423b21038d3d57714ec27fc12b8b9..55a541d60ea3cf348db380b215f3cfb24ea533c2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -39,7 +39,7 @@ private[deploy] object DeployTestUtils { } def createDriverCommand(): Command = new Command( - "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"), + "org.apache.spark.FakeClass", Seq("WORKER_URL", "USER_JAR", "mainClass"), Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo") ) @@ -47,7 +47,7 @@ private[deploy] object DeployTestUtils { new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand()) def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", - createDriverDesc(), new Date()) + createDriverDesc(), JsonConstants.submitDate) def createWorkerInfo(): WorkerInfo = { val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 7093dad05c5f6533b453b7bfeca8317161c6d1a0..1903130cb694af7400296e3135e9a2b2d161b4bb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -104,8 +104,8 @@ object JsonConstants { val submitDate = new Date(123456789) val appInfoJsonStr = """ - |{"starttime":3,"id":"id","name":"name", - |"cores":4,"user":"%s", + |{"id":"id","starttime":3,"name":"name", + |"cores":0,"user":"%s", |"memoryperslave":1234,"submitdate":"%s", |"state":"WAITING","duration":%d} """.format(System.getProperty("user.name", "<unknown>"), @@ -134,19 +134,24 @@ object JsonConstants { val driverInfoJsonStr = """ - |{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100} - """.stripMargin + |{"id":"driver-3","starttime":"3", + |"state":"SUBMITTED","cores":3,"memory":100, + |"submitdate":"%s","worker":"None", + |"mainclass":"mainClass"} + """.format(submitDate.toString).stripMargin val masterStateJsonStr = """ |{"url":"spark://host:8080", |"workers":[%s,%s], + |"aliveworkers":2, |"cores":8,"coresused":0,"memory":2468,"memoryused":0, |"activeapps":[%s],"completedapps":[], |"activedrivers":[%s], + |"completeddrivers":[%s], |"status":"ALIVE"} """.format(workerInfoJsonStr, workerInfoJsonStr, - appInfoJsonStr, driverInfoJsonStr).stripMargin + appInfoJsonStr, driverInfoJsonStr, driverInfoJsonStr).stripMargin val workerStateJsonStr = """