diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cb4fb7cfbd32f5f193443381e7a7e8396a2de148..529febff94196f80da8c75ebfb010a7fb8285fa4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging { /** Post the application start event */ private def postApplicationStart() { - listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser)) + // Note: this code assumes that the task scheduler has been initialized and has contacted + // the cluster manager to get an application ID (in case the cluster manager provides one). + listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(), + startTime, sparkUser)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index a0e8bd403a41d57268fd317c38f4ae40947373ff..fbe39b27649f636fb638b90f79d90c6501ae4521 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider { * * @return List of all know applications. */ - def getListing(): Seq[ApplicationHistoryInfo] + def getListing(): Iterable[ApplicationHistoryInfo] /** * Returns the Spark UI for a specific application. * * @param appId The application ID. - * @return The application's UI, or null if application is not found. + * @return The application's UI, or None if application is not found. */ - def getAppUI(appId: String): SparkUI + def getAppUI(appId: String): Option[SparkUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 05c8a90782c749c9c57136af931a70e93148474e..481f6c93c6a8d33c99456891e5e39cb6255f3fd9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -32,6 +32,8 @@ import org.apache.spark.util.Utils private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider with Logging { + private val NOT_STARTED = "<Not Started>" + // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", conf.getInt("spark.history.updateInterval", 10)) * 1000 @@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTimeMs = -1L - // List of applications, in order from newest to oldest. - @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil + // The modification time of the newest log detected during the last scan. This is used + // to ignore logs that are older during subsequent scans, to avoid processing data that + // is already known. + private var lastModifiedTime = -1L + + // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted + // into the map in order, so the LinkedHashMap maintains the correct ordering. + @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] + = new mutable.LinkedHashMap() /** * A background thread that periodically checks for event log updates on disk. @@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis logCheckingThread.start() } - override def getListing() = appList + override def getListing() = applications.values - override def getAppUI(appId: String): SparkUI = { + override def getAppUI(appId: String): Option[SparkUI] = { try { - val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId)) - val (_, ui) = loadAppInfo(appLogDir, renderUI = true) - ui + applications.get(appId).map { info => + val (replayBus, appListener) = createReplayBus(fs.getFileStatus( + new Path(logDir, info.logDir))) + val ui = { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, + s"${HistoryServer.UI_PATH_PREFIX}/$appId") + // Do not call ui.bind() to avoid creating a new server for each application + } + + replayBus.replay() + + ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)") + + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setAcls(uiAclsEnabled) + // make sure to set admin acls before view acls so they are properly picked up + ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) + ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED), + appListener.viewAcls.getOrElse("")) + ui + } } catch { - case e: FileNotFoundException => null + case e: FileNotFoundException => None } } @@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis try { val logStatus = fs.listStatus(new Path(resolvedLogDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs.filter { dir => - fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE)) - } - val currentApps = Map[String, ApplicationHistoryInfo]( - appList.map(app => app.id -> app):_*) - - // For any application that either (i) is not listed or (ii) has changed since the last time - // the listing was created (defined by the log dir's modification time), load the app's info. - // Otherwise just reuse what's already in memory. - val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) - for (dir <- logInfos) { - val curr = currentApps.getOrElse(dir.getPath().getName(), null) - if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + // Load all new logs from the log directory. Only directories that have a modification time + // later than the last known log directory will be loaded. + var newLastModifiedTime = lastModifiedTime + val logInfos = logDirs + .filter { dir => + if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) { + val modTime = getModificationTime(dir) + newLastModifiedTime = math.max(newLastModifiedTime, modTime) + modTime > lastModifiedTime + } else { + false + } + } + .flatMap { dir => try { - val (app, _) = loadAppInfo(dir, renderUI = false) - newApps += app + val (replayBus, appListener) = createReplayBus(dir) + replayBus.replay() + Some(new FsApplicationHistoryInfo( + dir.getPath().getName(), + appListener.appId.getOrElse(dir.getPath().getName()), + appListener.appName.getOrElse(NOT_STARTED), + appListener.startTime.getOrElse(-1L), + appListener.endTime.getOrElse(-1L), + getModificationTime(dir), + appListener.sparkUser.getOrElse(NOT_STARTED))) } catch { - case e: Exception => logError(s"Failed to load app info from directory $dir.") + case e: Exception => + logInfo(s"Failed to load application log data from $dir.", e) + None + } + } + .sortBy { info => -info.endTime } + + lastModifiedTime = newLastModifiedTime + + // When there are new logs, merge the new list with the existing one, maintaining + // the expected ordering (descending end time). Maintaining the order is important + // to avoid having to sort the list every time there is a request for the log list. + if (!logInfos.isEmpty) { + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfAbsent(info: FsApplicationHistoryInfo) = { + if (!newApps.contains(info.id)) { + newApps += (info.id -> info) } - } else { - newApps += curr } - } - appList = newApps.sortBy { info => -info.endTime } + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext && oldIterator.hasNext) { + if (newIterator.head.endTime > oldIterator.head.endTime) { + addIfAbsent(newIterator.next) + } else { + addIfAbsent(oldIterator.next) + } + } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = newApps + } } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } } - /** - * Parse the application's logs to find out the information we need to build the - * listing page. - * - * When creating the listing of available apps, there is no need to load the whole UI for the - * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user - * clicks on a specific application. - * - * @param logDir Directory with application's log files. - * @param renderUI Whether to create the SparkUI for the application. - * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false. - */ - private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = { - val path = logDir.getPath - val appId = path.getName + private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = { + val path = logDir.getPath() val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs) val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) val appListener = new ApplicationEventListener replayBus.addListener(appListener) - - val ui: SparkUI = if (renderUI) { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - new SparkUI(conf, appSecManager, replayBus, appId, - HistoryServer.UI_PATH_PREFIX + s"/$appId") - // Do not call ui.bind() to avoid creating a new server for each application - } else { - null - } - - replayBus.replay() - val appInfo = ApplicationHistoryInfo( - appId, - appListener.appName, - appListener.startTime, - appListener.endTime, - getModificationTime(logDir), - appListener.sparkUser) - - if (ui != null) { - val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - ui.getSecurityManager.setAcls(uiAclsEnabled) - // make sure to set admin acls before view acls so properly picked up - ui.getSecurityManager.setAdminAcls(appListener.adminAcls) - ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) - } - (appInfo, ui) + (replayBus, appListener) } /** Return when this directory was last modified. */ @@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000) } + +private class FsApplicationHistoryInfo( + val logDir: String, + id: String, + name: String, + startTime: Long, + endTime: Long, + lastUpdated: Long, + sparkUser: String) + extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index d1a64c1912cb8a1495a2e14497dbb5e8605ff488..ce00c0ffd21e015a9f5a7ce53deaf68f366f9fed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -52,10 +52,7 @@ class HistoryServer( private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { - val ui = provider.getAppUI(key) - if (ui == null) { - throw new NoSuchElementException() - } + val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException()) attachSparkUI(ui) ui } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index 162158babc35b0a44142a271a659f86cce53333b..6d39a5e3fa64c06173b6f425e7c7072e702c59cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -24,38 +24,31 @@ package org.apache.spark.scheduler * from multiple applications are seen, the behavior is unspecified. */ private[spark] class ApplicationEventListener extends SparkListener { - var appName = "<Not Started>" - var sparkUser = "<Not Started>" - var startTime = -1L - var endTime = -1L - var viewAcls = "" - var adminAcls = "" - - def applicationStarted = startTime != -1 - - def applicationCompleted = endTime != -1 - - def applicationDuration: Long = { - val difference = endTime - startTime - if (applicationStarted && applicationCompleted && difference > 0) difference else -1L - } + var appName: Option[String] = None + var appId: Option[String] = None + var sparkUser: Option[String] = None + var startTime: Option[Long] = None + var endTime: Option[Long] = None + var viewAcls: Option[String] = None + var adminAcls: Option[String] = None override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { - appName = applicationStart.appName - startTime = applicationStart.time - sparkUser = applicationStart.sparkUser + appName = Some(applicationStart.appName) + appId = applicationStart.appId + startTime = Some(applicationStart.time) + sparkUser = Some(applicationStart.sparkUser) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - endTime = applicationEnd.time + endTime = Some(applicationEnd.time) } override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { val environmentDetails = environmentUpdate.environmentDetails val allProperties = environmentDetails("Spark Properties").toMap - viewAcls = allProperties.getOrElse("spark.ui.view.acls", "") - adminAcls = allProperties.getOrElse("spark.admin.acls", "") + viewAcls = allProperties.get("spark.ui.view.acls") + adminAcls = allProperties.get("spark.admin.acls") } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index e41e0a98416916b8c666f92ad8a5806a548f8c5a..a0be8307eff273c0610ed689375248e367f2f46f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException def isReady(): Boolean = true + + /** + * The application ID associated with the job, if any. + * + * @return The application ID, or None if the backend does not provide an ID. + */ + def applicationId(): Option[String] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index f33c2e065a2008030b0d7ce1c40e9407e908febe..86afe3bd5265f1d810feaeae7518474e7628fb11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate( extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String) - extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long, + sparkUser: String) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1a0b877c8a5e1044c232610ea2aa7ea14ab507e6..1c1ce666eab0f5d6519cf221aff6a05bf1753fba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -64,4 +64,12 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean + + /** + * The application ID associated with the job, if any. + * + * @return The application ID, or None if the backend does not provide an ID. + */ + def applicationId(): Option[String] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ad051e59af86db3508cc81a0ab4c957b20565e3d..633e892554c502a4a6df9ca19de8e1afc7ed4ef9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl( } } } + + override def applicationId(): Option[String] = backend.applicationId() + } @@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl { retval.toList } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2a3711ae2a78c8e25ef603111a0b99c40ba97883..5b5257269d92ff052f9cbd5da97e13043807883a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered resources / total expected resources) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. var minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) // Submit tasks after maxRegisteredWaitingTime milliseconds - // if minRegisteredRatio has not yet been reached + // if minRegisteredRatio has not yet been reached val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000) val createTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index bc7670f4a804d99e40ad07bc32ba778e3d8aa379..513d74a08a47f9b3d5d813fb372b9be5ca51a46c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -69,4 +69,5 @@ private[spark] class SimrSchedulerBackend( fs.delete(new Path(driverFilePath), false) super.stop() } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 32138e5246700d98fbf1fb478f0c9ad46f037b14..06872ace2ecf44de650ec9ab1cc73a9ab0480e98 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -34,6 +34,10 @@ private[spark] class SparkDeploySchedulerBackend( var client: AppClient = null var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ + var appId: String = _ + + val registrationLock = new Object() + var registrationDone = false val maxCores = conf.getOption("spark.cores.max").map(_.toInt) val totalExpectedCores = maxCores.getOrElse(0) @@ -68,6 +72,8 @@ private[spark] class SparkDeploySchedulerBackend( client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() + + waitForRegistration() } override def stop() { @@ -81,15 +87,19 @@ private[spark] class SparkDeploySchedulerBackend( override def connected(appId: String) { logInfo("Connected to Spark cluster with app ID " + appId) + this.appId = appId + notifyContext() } override def disconnected() { + notifyContext() if (!stopping) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { + notifyContext() if (!stopping) { logError("Application has been killed. Reason: " + reason) scheduler.error(reason) @@ -116,4 +126,22 @@ private[spark] class SparkDeploySchedulerBackend( override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } + + override def applicationId(): Option[String] = Option(appId) + + private def waitForRegistration() = { + registrationLock.synchronized { + while (!registrationDone) { + registrationLock.wait() + } + } + } + + private def notifyContext() = { + registrationLock.synchronized { + registrationDone = true + registrationLock.notifyAll() + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 87e181e773fdfc270f5daf0c96f766f6ee222a88..da43ef567608c0f40ec7ec3fea9be630ad686ac5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -309,4 +309,5 @@ private[spark] class CoarseMesosSchedulerBackend( logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) slaveLost(d, s) } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 67ee4d66f151b603bb1acfa6ac0ce6a73b060de0..a9ef126f5de0e24d885867ca341b30b8580b50a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -349,4 +349,5 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index bec9502f204665e01807051bbb5ca42efd6380dd..9ea25c2bc70901d0828cfedf0c4f48d6141183b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -114,4 +114,5 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { localActor ! StatusUpdate(taskId, state, serializedData) } + } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index bee6dad3387e5a26c68ef06864b08f8ffb0ad98b..f0006b42aee4f469bac4911bb6b4ba1efb42ba85 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -232,7 +232,7 @@ private[spark] object UIUtils extends Logging { def listingTable[T]( headers: Seq[String], generateDataRow: T => Seq[Node], - data: Seq[T], + data: Iterable[T], fixedWidth: Boolean = false): Seq[Node] = { var listingTableClass = TABLE_CLASS diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 1fc536b0969960efd01619b901d213befa2d7662..b0754e3ce10db5fe9e9f18c3694f765b8fae1ff1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -171,6 +171,7 @@ private[spark] object JsonProtocol { def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ ("App Name" -> applicationStart.appName) ~ + ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) } @@ -484,9 +485,10 @@ private[spark] object JsonProtocol { def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = { val appName = (json \ "App Name").extract[String] + val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String]) val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] - SparkListenerApplicationStart(appName, time, sparkUser) + SparkListenerApplicationStart(appName, appId, time, sparkUser) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 41e58a008c533039ce65a0f52b6723ea5e3eabd0..fead8837934309a40c764f9259216adedf004888 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -229,7 +229,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { val conf = getLoggingConf(logDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) val listenerBus = new LiveListenerBus - val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, + 125L, "Mickey") val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 8f0ee9f4dbafd7bb8063d2f25ed6e2ea1db4a491..7ab351d1b4d2421360843f599277f0f1854a1946 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -83,7 +83,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { val fstream = fileSystem.create(logFilePath) val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val writer = new PrintWriter(cstream) - val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, + 125L, "Mickey") val applicationEnd = SparkListenerApplicationEnd(1000L) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c84bafce37f70bda06ed5ea2a77aeccd0310739a..2b45d8b6958534a920b876095328c69af71cd1cf 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -60,7 +60,7 @@ class JsonProtocolSuite extends FunSuite { val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) - val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") + val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) testEvent(stageSubmitted, stageSubmittedJsonString) @@ -176,6 +176,13 @@ class JsonProtocolSuite extends FunSuite { deserializedBmRemoved) } + test("SparkListenerApplicationStart backwards compatibility") { + // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property. + val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user") + val oldEvent = JsonProtocol.applicationStartToJson(applicationStart) + .removeField({ _._1 == "App ID" }) + assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent)) + } /** -------------------------- * | Helper test running methods | diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a2f1b3582ab71e601e2ffa4a555021f9bf9120b5..855d5cc8cf3fd046709ca3cf5778ff475987da23 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -111,6 +111,8 @@ object MimaExcludes { MimaBuild.excludeSparkClass("storage.Values") ++ MimaBuild.excludeSparkClass("storage.Entry") ++ MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++ + // Class was missing "@DeveloperApi" annotation in 1.0. + MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++ Seq( ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.mllib.tree.impurity.Gini.calculate"), @@ -119,14 +121,14 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.mllib.tree.impurity.Variance.calculate") ) ++ - Seq ( // Package-private classes removed in SPARK-2341 + Seq( // Package-private classes removed in SPARK-2341 ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$") - ) ++ + ) ++ Seq( // package-private classes removed in MLlib ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8c548409719daeafda0ea854fad009e5b661a304..98039a20de245d76d0f1d46c496e6bfe5491cb05 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} @@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private val sparkContextRef = new AtomicReference[SparkContext](null) final def run(): Int = { + val appAttemptId = client.getAttemptId() + if (isDriver) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box @@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Set the master property to match the requested mode. System.setProperty("spark.master", "yarn-cluster") + + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) } - logInfo("ApplicationAttemptId: " + client.getAttemptId()) + logInfo("ApplicationAttemptId: " + appAttemptId) val cleanupHook = new Runnable { override def run() { @@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.compareAndSet(sc, null) } - private def registerAM(uiAddress: String, uiHistoryAddress: String) = { + private def registerAM(uiAddress: String) = { val sc = sparkContextRef.get() + + val appId = client.getAttemptId().getApplicationId().toString() + val historyAddress = + sparkConf.getOption("spark.yarn.historyServer.address") + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" } + .getOrElse("") + allocator = client.register(yarnConf, if (sc != null) sc.getConf else sparkConf, if (sc != null) sc.preferredNodeLocationData else Map(), uiAddress, - uiHistoryAddress) + historyAddress) allocator.allocateResources() reporterThread = launchReporterThread() @@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc == null) { finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") } else { - registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf)) + registerAM(sc.ui.appUIHostPort) try { userThread.join() } finally { @@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, conf = sparkConf, securityManager = securityMgr)._1 actor = waitForSparkDriver() addAmIpFilter() - registerAM(sparkConf.get("spark.driver.appUIAddress", ""), - sparkConf.get("spark.driver.appUIHistoryAddress", "")) + registerAM(sparkConf.get("spark.driver.appUIAddress", "")) // In client mode the actor will stop the reporter thread. reporterThread.join() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ffe2731ca1d17812c097065a4513cf9164306c35..dc77f1236492dd51d21a41ef0033c3ebd645c1a1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -156,19 +155,6 @@ object YarnSparkHadoopUtil { } } - def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = { - val eventLogDir = sc.eventLogger match { - case Some(logger) => logger.getApplicationLogDir() - case None => "" - } - val historyServerAddress = conf.get("spark.yarn.historyServer.address", "") - if (historyServerAddress != "" && eventLogDir != "") { - historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir" - } else { - "" - } - } - /** * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index a5f537dd9de307a690c3db679ef3074542513093..41c662cd7a6de47528a9b801bd6113799775528c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend( val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) - conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf)) val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( @@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend( override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } + + override def applicationId(): Option[String] = Option(appId).map(_.toString()) + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 55665220a6f96a278406398a2776c9773df867d2..39436d09996634b736957455682f0b33cfa3840c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { var totalExpectedExecutors = 0 - + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { minRegisteredRatio = 0.8 } @@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend( override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } + + override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id") + }