Skip to content
Snippets Groups Projects
Commit 233e534a authored by Jacek Lewandowski's avatar Jacek Lewandowski Committed by Sean Owen
Browse files

[SPARK-11344] Made ApplicationDescription and DriverDescription case classes

DriverDescription refactored to case class because it included no mutable fields.

ApplicationDescription had one mutable field, which was appUiUrl. This field was set by the driver to point to the driver web UI. Master was modifying this field when the application was removed to redirect requests to history server. This was wrong because objects which are sent over the wire should be immutable. Now appUiUrl is immutable in ApplicationDescription and always points to the driver UI even if it is already shutdown. The UI url which master exposes to the user and modifies dynamically is now included into ApplicationInfo - a data object which describes the application state internally in master. That URL in ApplicationInfo is initialised with the value from ApplicationDescription.

ApplicationDescription also included value user, which is now a part of case class fields.

Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>

Closes #9299 from jacek-lewandowski/SPARK-11344.
parent b86f2cab
No related branches found
No related tags found
No related merge requests found
......@@ -19,30 +19,17 @@ package org.apache.spark.deploy
import java.net.URI
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerExecutorMB: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[URI] = None,
private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
memoryPerExecutorMB: Int,
command: Command,
appUiUrl: String,
eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None,
val coresPerExecutor: Option[Int] = None)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
def copy(
name: String = name,
maxCores: Option[Int] = maxCores,
memoryPerExecutorMB: Int = memoryPerExecutorMB,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
override def toString: String = "ApplicationDescription(" + name + ")"
}
......@@ -17,21 +17,12 @@
package org.apache.spark.deploy
private[deploy] class DriverDescription(
val jarUrl: String,
val mem: Int,
val cores: Int,
val supervise: Boolean,
val command: Command)
extends Serializable {
def copy(
jarUrl: String = jarUrl,
mem: Int = mem,
cores: Int = cores,
supervise: Boolean = supervise,
command: Command = command): DriverDescription =
new DriverDescription(jarUrl, mem, cores, supervise, command)
private[deploy] case class DriverDescription(
jarUrl: String,
mem: Int,
cores: Int,
supervise: Boolean,
command: Command) {
override def toString: String = s"DriverDescription (${command.mainClass})"
}
......@@ -41,6 +41,7 @@ private[spark] class ApplicationInfo(
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
@transient @volatile var appUIUrlAtHistoryServer: Option[String] = None
// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
......@@ -135,4 +136,10 @@ private[spark] class ApplicationInfo(
}
}
/**
* Returns the original application UI url unless there is its address at history server
* is defined
*/
def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl)
}
......@@ -768,7 +768,8 @@ private[deploy] class Master(
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
val appId = newApplicationId(date)
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
}
private def registerApplication(app: ApplicationInfo): Unit = {
......@@ -920,7 +921,7 @@ private[deploy] class Master(
val eventLogDir = app.desc.eventLogDir
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
return None
}
......@@ -954,7 +955,7 @@ private[deploy] class Master(
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.basePath
app.appUIUrlAtHistoryServer = Some(ui.basePath)
Some(ui)
} catch {
case fnf: FileNotFoundException =>
......@@ -964,7 +965,7 @@ private[deploy] class Master(
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")
None
case e: Exception =>
// Relay exception message to application UI page
......@@ -973,7 +974,8 @@ private[deploy] class Master(
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
app.appUIUrlAtHistoryServer =
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
None
}
}
......
......@@ -76,7 +76,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
<li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li>
</ul>
</div>
</div>
......
......@@ -206,7 +206,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
{killLink}
</td>
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
<a href={app.curAppUIUrl}>{app.desc.name}</a>
</td>
<td>
{app.coresGranted}
......
......@@ -31,8 +31,9 @@ private[deploy] object DeployTestUtils {
}
def createAppInfo() : ApplicationInfo = {
val appDesc = createAppDesc()
val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
"id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
"id", appDesc, JsonConstants.submitDate, null, Int.MaxValue)
appInfo.endTime = JsonConstants.currTimeInMillis
appInfo
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment