diff --git a/bin/spark-class b/bin/spark-class index 76fde3e448891ea876d54ce255b2b9e5e73ff5ea..1b0d309cc5b1c0d37a24327cef441293d5862e8b 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m} SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" -# Add java opts and memory settings for master, worker, executors, and repl. +# Add java opts and memory settings for master, worker, history server, executors, and repl. case "$1" in - # Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. + # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. 'org.apache.spark.deploy.master.Master') OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS" OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} @@ -58,6 +58,10 @@ case "$1" in OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS" OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} ;; + 'org.apache.spark.deploy.history.HistoryServer') + OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS" + OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM} + ;; # Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY. 'org.apache.spark.executor.CoarseGrainedExecutorBackend') diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd index f488cfdbeceb6a4aed3a629313f8edb7bc048c21..4302c1b6b7ff48783bd8a604fc361b17293a0133 100755 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true -rem Add java opts and memory settings for master, worker, executors, and repl. -rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. +rem Add java opts and memory settings for master, worker, history server, executors, and repl. +rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. if "%1"=="org.apache.spark.deploy.master.Master" ( set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS% if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY% ) else if "%1"=="org.apache.spark.deploy.worker.Worker" ( set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS% if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY% +) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" ( + set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS% + if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY% rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY. ) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" ( diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 76305237b03d5e048e73cb0267b1d4466f9f7525..e6c9b7000d819446dad3f360ca71aef10dfa1d9a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val eventLogger: Option[EventLoggingListener] = { if (conf.getBoolean("spark.eventLog.enabled", false)) { val logger = new EventLoggingListener(appName, conf) + logger.start() listenerBus.addListener(logger) Some(logger) } else None } - // Information needed to replay logged events, if any - private[spark] val eventLoggingInfo: Option[EventLoggingInfo] = - eventLogger.map { logger => Some(logger.info) }.getOrElse(None) - // At this point, all relevant SparkListeners have been registered, so begin releasing events listenerBus.start() @@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging { cleaner.foreach(_.start()) postEnvironmentUpdate() + postApplicationStart() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration: Configuration = { @@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging { listenerBus.addListener(listener) } + /** The version of Spark on which this application is running. */ + def version = SparkContext.SPARK_VERSION + /** * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. @@ -930,6 +931,7 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { + postApplicationEnd() ui.stop() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. @@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging { /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + /** Post the application start event */ + private def postApplicationStart() { + listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser)) + } + + /** + * Post the application end event to all listeners immediately, rather than adding it + * to the event queue for it to be asynchronously processed eventually. Otherwise, a race + * condition exists in which the listeners may stop before this event has been propagated. + */ + private def postApplicationEnd() { + listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis)) + } + /** Post the environment update event once the task scheduler is ready */ private def postEnvironmentUpdate() { if (taskScheduler != null) { @@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging { */ object SparkContext extends Logging { + private[spark] val SPARK_VERSION = "1.0.0" + private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 15fa8a76798741c430bc37cf1751506e928d5501..86305d2ea8a092075a4e66556ebbe2a7957ea46a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy -import org.apache.spark.scheduler.EventLoggingInfo - private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], @@ -26,7 +24,7 @@ private[spark] class ApplicationDescription( val command: Command, val sparkHome: Option[String], var appUiUrl: String, - val eventLogInfo: Option[EventLoggingInfo] = None) + val eventLogDir: Option[String] = None) extends Serializable { val user = System.getProperty("user.name", "<unknown>") diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala new file mode 100644 index 0000000000000000000000000000000000000000..33fceae4ff489480690e70205e5af93ce2cf45c2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.apache.spark.ui.{SparkUI, WebUI} + +private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) { + + /** Attach a SparkUI to this container. Only valid after bind(). */ + def attachUI(ui: SparkUI) { + assert(serverInfo.isDefined, + "%s must be bound to a server before attaching SparkUIs".format(name)) + val rootHandler = serverInfo.get.rootHandler + for (handler <- ui.handlers) { + rootHandler.addHandler(handler) + if (!handler.isStarted) { + handler.start() + } + } + } + + /** Detach a SparkUI from this container. Only valid after bind(). */ + def detachUI(ui: SparkUI) { + assert(serverInfo.isDefined, + "%s must be bound to a server before detaching SparkUIs".format(name)) + val rootHandler = serverInfo.get.rootHandler + for (handler <- ui.handlers) { + if (handler.isStarted) { + handler.stop() + } + rootHandler.removeHandler(handler) + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala new file mode 100644 index 0000000000000000000000000000000000000000..97d2ba9deed332d706d6d30949ed0d1415586899 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.eclipse.jetty.servlet.ServletContextHandler + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.SparkUIContainer +import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.Utils + +/** + * A web server that renders SparkUIs of completed applications. + * + * For the standalone mode, MasterWebUI already achieves this functionality. Thus, the + * main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos). + * + * The logging directory structure is as follows: Within the given base directory, each + * application's event logs are maintained in the application's own sub-directory. This + * is the same structure as maintained in the event log write code path in + * EventLoggingListener. + * + * @param baseLogDir The base directory in which event logs are found + */ +class HistoryServer( + val baseLogDir: String, + conf: SparkConf) + extends SparkUIContainer("History Server") with Logging { + + import HistoryServer._ + + private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) + private val localHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) + private val port = WEB_UI_PORT + private val securityManager = new SecurityManager(conf) + private val indexPage = new IndexPage(this) + + // A timestamp of when the disk was last accessed to check for log updates + private var lastLogCheckTime = -1L + + // Number of completed applications found in this directory + private var numCompletedApplications = 0 + + @volatile private var stopped = false + + /** + * A background thread that periodically checks for event log updates on disk. + * + * If a log check is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it performs the next log check to maintain the same period as before. + * + * TODO: Add a mechanism to update manually. + */ + private val logCheckingThread = new Thread { + override def run() { + while (!stopped) { + val now = System.currentTimeMillis + if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { + checkForLogs() + Thread.sleep(UPDATE_INTERVAL_MS) + } else { + // If the user has manually checked for logs recently, wait until + // UPDATE_INTERVAL_MS after the last check time + Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + } + } + } + } + + private val handlers = Seq[ServletContextHandler]( + createStaticHandler(STATIC_RESOURCE_DIR, "/static"), + createServletHandler("/", + (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager) + ) + + // A mapping of application ID to its history information, which includes the rendered UI + val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() + + /** + * Start the history server. + * + * This starts a background thread that periodically synchronizes information displayed on + * this UI with the event logs in the provided base directory. + */ + def start() { + logCheckingThread.start() + } + + /** Bind to the HTTP server behind this web interface. */ + override def bind() { + try { + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) + logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort)) + } catch { + case e: Exception => + logError("Failed to bind HistoryServer", e) + System.exit(1) + } + } + + /** + * Check for any updates to event logs in the base directory. This is only effective once + * the server has been bound. + * + * If a new completed application is found, the server renders the associated SparkUI + * from the application's event logs, attaches this UI to itself, and stores metadata + * information for this application. + * + * If the logs for an existing completed application are no longer found, the server + * removes all associated information and detaches the SparkUI. + */ + def checkForLogs() = synchronized { + if (serverInfo.isDefined) { + lastLogCheckTime = System.currentTimeMillis + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) + try { + val logStatus = fileSystem.listStatus(new Path(baseLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val logInfos = logDirs + .sortBy { dir => getModificationTime(dir) } + .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) } + .filter { case (dir, info) => info.applicationComplete } + + // Logging information for applications that should be retained + val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS) + val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName } + + // Remove any applications that should no longer be retained + appIdToInfo.foreach { case (appId, info) => + if (!retainedAppIds.contains(appId)) { + detachUI(info.ui) + appIdToInfo.remove(appId) + } + } + + // Render the application's UI if it is not already there + retainedLogInfos.foreach { case (dir, info) => + val appId = dir.getPath.getName + if (!appIdToInfo.contains(appId)) { + renderSparkUI(dir, info) + } + } + + // Track the total number of completed applications observed this round + numCompletedApplications = logInfos.size + + } catch { + case t: Throwable => logError("Exception in checking for event log updates", t) + } + } else { + logWarning("Attempted to check for event log updates before binding the server.") + } + } + + /** + * Render a new SparkUI from the event logs if the associated application is completed. + * + * HistoryServer looks for a special file that indicates application completion in the given + * directory. If this file exists, the associated application is regarded to be completed, in + * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. + */ + private def renderSparkUI(logDir: FileStatus, logInfo: EventLoggingInfo) { + val path = logDir.getPath + val appId = path.getName + val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec) + val ui = new SparkUI(replayBus, appId, "/history/" + appId) + val appListener = new ApplicationEventListener + replayBus.addListener(appListener) + + // Do not call ui.bind() to avoid creating a new server for each application + ui.start() + replayBus.replay() + if (appListener.applicationStarted) { + attachUI(ui) + val appName = appListener.appName + val sparkUser = appListener.sparkUser + val startTime = appListener.startTime + val endTime = appListener.endTime + val lastUpdated = getModificationTime(logDir) + ui.setAppName(appName + " (completed)") + appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime, + lastUpdated, sparkUser, path, ui) + } + } + + /** Stop the server and close the file system. */ + override def stop() { + super.stop() + stopped = true + fileSystem.close() + } + + /** Return the address of this server. */ + def getAddress: String = "http://" + publicHost + ":" + boundPort + + /** Return the number of completed applications found, whether or not the UI is rendered. */ + def getNumApplications: Int = numCompletedApplications + + /** Return when this directory was last modified. */ + private def getModificationTime(dir: FileStatus): Long = { + try { + val logFiles = fileSystem.listStatus(dir.getPath) + if (logFiles != null && !logFiles.isEmpty) { + logFiles.map(_.getModificationTime).max + } else { + dir.getModificationTime + } + } catch { + case t: Throwable => + logError("Exception in accessing modification time of %s".format(dir.getPath), t) + -1L + } + } +} + +/** + * The recommended way of starting and stopping a HistoryServer is through the scripts + * start-history-server.sh and stop-history-server.sh. The path to a base log directory + * is must be specified, while the requested UI port is optional. For example: + * + * ./sbin/spark-history-server.sh /tmp/spark-events + * ./sbin/spark-history-server.sh hdfs://1.2.3.4:9000/spark-events + * + * This launches the HistoryServer as a Spark daemon. + */ +object HistoryServer { + private val conf = new SparkConf + + // Interval between each check for event log updates + val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000 + + // How many applications to retain + val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250) + + // The port to which the web UI is bound + val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080) + + val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR + + def main(argStrings: Array[String]) { + val args = new HistoryServerArguments(argStrings) + val server = new HistoryServer(args.logDir, conf) + server.bind() + server.start() + + // Wait until the end of the world... or if the HistoryServer process is manually stopped + while(true) { Thread.sleep(Int.MaxValue) } + server.stop() + } +} + + +private[spark] case class ApplicationHistoryInfo( + id: String, + name: String, + startTime: Long, + endTime: Long, + lastUpdated: Long, + sparkUser: String, + logDirPath: Path, + ui: SparkUI) { + def started = startTime != -1 + def completed = endTime != -1 +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala new file mode 100644 index 0000000000000000000000000000000000000000..943c061743dbd16fc94d29eb58f57c6dc671496a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.net.URI + +import org.apache.hadoop.fs.Path + +import org.apache.spark.util.Utils + +/** + * Command-line parser for the master. + */ +private[spark] class HistoryServerArguments(args: Array[String]) { + var logDir = "" + + parse(args.toList) + + private def parse(args: List[String]): Unit = { + args match { + case ("--dir" | "-d") :: value :: tail => + logDir = value + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => + + case _ => + printUsageAndExit(1) + } + validateLogDir() + } + + private def validateLogDir() { + if (logDir == "") { + System.err.println("Logging directory must be specified.") + printUsageAndExit(1) + } + val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) + val path = new Path(logDir) + if (!fileSystem.exists(path)) { + System.err.println("Logging directory specified does not exist: %s".format(logDir)) + printUsageAndExit(1) + } + if (!fileSystem.getFileStatus(path).isDir) { + System.err.println("Logging directory specified is not a directory: %s".format(logDir)) + printUsageAndExit(1) + } + } + + private def printUsageAndExit(exitCode: Int) { + System.err.println( + "Usage: HistoryServer [options]\n" + + "\n" + + "Options:\n" + + " -d DIR, --dir DIR Location of event log files") + System.exit(exitCode) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala new file mode 100644 index 0000000000000000000000000000000000000000..54dffffec71c5b327edfce6e3843e9fbbc097a03 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIUtils, WebUI} + +private[spark] class IndexPage(parent: HistoryServer) { + + def render(request: HttpServletRequest): Seq[Node] = { + val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } + val appTable = UIUtils.listingTable(appHeader, appRow, appRows) + val content = + <div class="row-fluid"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li> + </ul> + { + if (parent.appIdToInfo.size > 0) { + <h4> + Showing {parent.appIdToInfo.size}/{parent.getNumApplications} + Completed Application{if (parent.getNumApplications > 1) "s" else ""} + </h4> ++ + appTable + } else { + <h4>No Completed Applications Found</h4> + } + } + </div> + </div> + UIUtils.basicSparkPage(content, "History Server") + } + + private val appHeader = Seq( + "App Name", + "Started", + "Completed", + "Duration", + "Spark User", + "Log Directory", + "Last Updated") + + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { + val appName = if (info.started) info.name else info.logDirPath.getName + val uiAddress = parent.getAddress + info.ui.basePath + val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started" + val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed" + val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L + val duration = if (difference > 0) WebUI.formatDuration(difference) else "---" + val sparkUser = if (info.started) info.sparkUser else "Unknown user" + val logDirectory = info.logDirPath.getName + val lastUpdated = WebUI.formatDate(info.lastUpdated) + <tr> + <td><a href={uiAddress}>{appName}</a></td> + <td>{startTime}</td> + <td>{endTime}</td> + <td>{duration}</td> + <td>{sparkUser}</td> + <td>{logDirectory}</td> + <td>{lastUpdated}</td> + </tr> + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 95bd62e88db2b718fa50f0ed64177e2ea4de81b8..2446e86cb6672b39a26ca9d023d1eceee42c1543 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -29,6 +29,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension +import org.apache.hadoop.fs.FileSystem import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} @@ -37,7 +38,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.scheduler.ReplayListenerBus +import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{AkkaUtils, Utils} @@ -45,7 +46,8 @@ private[spark] class Master( host: String, port: Int, webUiPort: Int, - val securityMgr: SecurityManager) extends Actor with Logging { + val securityMgr: SecurityManager) + extends Actor with Logging { import context.dispatcher // to use Akka's scheduler.schedule() @@ -71,6 +73,7 @@ private[spark] class Master( var nextAppNumber = 0 val appIdToUI = new HashMap[String, SparkUI] + val fileSystemsUsed = new HashSet[FileSystem] val drivers = new HashSet[DriverInfo] val completedDrivers = new ArrayBuffer[DriverInfo] @@ -149,6 +152,7 @@ private[spark] class Master( override def postStop() { webUi.stop() + fileSystemsUsed.foreach(_.close()) masterMetricsSystem.stop() applicationMetricsSystem.stop() persistenceEngine.close() @@ -630,11 +634,7 @@ private[spark] class Master( waitingApps -= app // If application events are logged, use them to rebuild the UI - startPersistedSparkUI(app).map { ui => - app.desc.appUiUrl = ui.basePath - appIdToUI(app.id) = ui - webUi.attachUI(ui) - }.getOrElse { + if (!rebuildSparkUI(app)) { // Avoid broken links if the UI is not reconstructed app.desc.appUiUrl = "" } @@ -654,30 +654,34 @@ private[spark] class Master( } /** - * Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason, - * return None. Otherwise return the reconstructed UI. + * Rebuild a new SparkUI from the given application's event logs. + * Return whether this is successful. */ - def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = { + def rebuildSparkUI(app: ApplicationInfo): Boolean = { val appName = app.desc.name - val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None } - val eventLogDir = eventLogInfo.logDir - val eventCompressionCodec = eventLogInfo.compressionCodec - val appConf = new SparkConf - eventCompressionCodec.foreach { codec => - appConf.set("spark.eventLog.compress", "true") - appConf.set("spark.io.compression.codec", codec) - } - val replayerBus = new ReplayListenerBus(appConf) - val ui = new SparkUI( - appConf, - replayerBus, - "%s (finished)".format(appName), - "/history/%s".format(app.id)) - - // Do not call ui.bind() to avoid creating a new server for each application - ui.start() - val success = replayerBus.replay(eventLogDir) - if (success) Some(ui) else None + val eventLogDir = app.desc.eventLogDir.getOrElse { return false } + val fileSystem = Utils.getHadoopFileSystem(eventLogDir) + val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem) + val eventLogPaths = eventLogInfo.logPaths + val compressionCodec = eventLogInfo.compressionCodec + if (!eventLogPaths.isEmpty) { + try { + val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) + val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id) + ui.start() + replayBus.replay() + app.desc.appUiUrl = ui.basePath + appIdToUI(app.id) = ui + webUi.attachUI(ui) + return true + } catch { + case t: Throwable => + logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t) + } + } else { + logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir)) + } + false } /** Generate a new app ID given a app's submission date */ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 01d9f52f4b7b4b1e05af713cb5dff114434b9101..30c8ade408a5af2948cbf5269f15739cd0d8dd01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -22,8 +22,9 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging +import org.apache.spark.deploy.SparkUIContainer import org.apache.spark.deploy.master.Master -import org.apache.spark.ui.{ServerInfo, SparkUI} +import org.apache.spark.ui.SparkUI import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -31,7 +32,9 @@ import org.apache.spark.util.{AkkaUtils, Utils} * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { +class MasterWebUI(val master: Master, requestedPort: Int) + extends SparkUIContainer("MasterWebUI") with Logging { + val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) @@ -39,7 +42,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { private val port = requestedPort private val applicationPage = new ApplicationPage(this) private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None private val handlers: Seq[ServletContextHandler] = { master.masterMetricsSystem.getServletHandlers ++ @@ -57,47 +59,18 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { ) } - def bind() { + /** Bind to the HTTP server behind this web interface. */ + override def bind() { try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf)) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Master JettyUtils", e) + logError("Failed to create Master web UI", e) System.exit(1) } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ - def attachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { - rootHandler.addHandler(handler) - if (!handler.isStarted) { - handler.start() - } - } - } - - /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ - def detachUI(ui: SparkUI) { - assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { - if (handler.isStarted) { - handler.stop() - } - rootHandler.removeHandler(handler) - } - } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 650f3da5ce3ff1bf7f863bbb1630b2f400b1ebca..5625a44549aaaf2203555898519d390979d24b1c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker -import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils} +import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -33,15 +33,14 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends Logging { + extends WebUI("WorkerWebUI") with Logging { val timeout = AkkaUtils.askTimeout(worker.conf) private val host = Utils.localHostName() private val port = requestedPort.getOrElse( - worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) + worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None private val handlers: Seq[ServletContextHandler] = { worker.metricsSystem.getServletHandlers ++ @@ -58,19 +57,18 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I ) } - def bind() { + /** Bind to the HTTP server behind this web interface. */ + override def bind() { try { - serverInfo = Some(JettyUtils.startJettyServer("0.0.0.0", port, handlers, worker.conf)) + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf)) logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Worker JettyUtils", e) + logError("Failed to create Worker web UI", e) System.exit(1) } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - private def log(request: HttpServletRequest): String = { val defaultBytes = 100 * 1024 @@ -187,13 +185,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I (startByte, endByte) } - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object WorkerWebUI { + val DEFAULT_PORT=8081 val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR - val DEFAULT_PORT="8081" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala new file mode 100644 index 0000000000000000000000000000000000000000..affda13df6531d5bb032146ca2c12dc52c16c0f4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +/** + * A simple listener for application events. + * + * This listener expects to hear events from a single application only. If events + * from multiple applications are seen, the behavior is unspecified. + */ +private[spark] class ApplicationEventListener extends SparkListener { + var appName = "<Not Started>" + var sparkUser = "<Not Started>" + var startTime = -1L + var endTime = -1L + + def applicationStarted = startTime != -1 + + def applicationFinished = endTime != -1 + + def applicationDuration: Long = { + val difference = endTime - startTime + if (applicationStarted && applicationFinished && difference > 0) difference else -1L + } + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { + appName = applicationStart.appName + startTime = applicationStart.time + sparkUser = applicationStart.sparkUser + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + endTime = applicationEnd.time + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 217f8825c2ae93df574f62273b532c4a5a37428e..b983c16af14f423184442471115a01860141632f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -17,11 +17,14 @@ package org.apache.spark.scheduler +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileSystem, Path} import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{JsonProtocol, FileLogger} +import org.apache.spark.util.{FileLogger, JsonProtocol} /** * A SparkListener that logs events to persistent storage. @@ -36,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, FileLogger} private[spark] class EventLoggingListener(appName: String, conf: SparkConf) extends SparkListener with Logging { + import EventLoggingListener._ + private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false) private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024 @@ -46,17 +51,21 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) private val logger = new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite) - // Information needed to replay the events logged by this listener later - val info = { - val compressionCodec = if (shouldCompress) { - Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)) - } else None - EventLoggingInfo(logDir, compressionCodec) + /** + * Begin logging events. + * If compression is used, log a file that indicates which compression library is used. + */ + def start() { + logInfo("Logging events to %s".format(logDir)) + if (shouldCompress) { + val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + logger.newFile(COMPRESSION_CODEC_PREFIX + codec) + } + logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) + logger.newFile(LOG_PREFIX + logger.fileIndex) } - logInfo("Logging events to %s".format(logDir)) - - /** Log the event as JSON */ + /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { val eventJson = compact(render(JsonProtocol.sparkEventToJson(event))) logger.logLine(eventJson) @@ -90,9 +99,118 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) logEvent(event, flushLogger = true) override def onUnpersistRDD(event: SparkListenerUnpersistRDD) = logEvent(event, flushLogger = true) + override def onApplicationStart(event: SparkListenerApplicationStart) = + logEvent(event, flushLogger = true) + override def onApplicationEnd(event: SparkListenerApplicationEnd) = + logEvent(event, flushLogger = true) + + /** + * Stop logging events. + * In addition, create an empty special file to indicate application completion. + */ + def stop() = { + logger.newFile(APPLICATION_COMPLETE) + logger.stop() + } +} + +private[spark] object EventLoggingListener extends Logging { + val LOG_PREFIX = "EVENT_LOG_" + val SPARK_VERSION_PREFIX = "SPARK_VERSION_" + val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" + val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + + // A cache for compression codecs to avoid creating the same codec many times + private val codecMap = new mutable.HashMap[String, CompressionCodec] + + def isEventLogFile(fileName: String): Boolean = { + fileName.startsWith(LOG_PREFIX) + } + + def isSparkVersionFile(fileName: String): Boolean = { + fileName.startsWith(SPARK_VERSION_PREFIX) + } + + def isCompressionCodecFile(fileName: String): Boolean = { + fileName.startsWith(COMPRESSION_CODEC_PREFIX) + } + + def isApplicationCompleteFile(fileName: String): Boolean = { + fileName == APPLICATION_COMPLETE + } + + def parseSparkVersion(fileName: String): String = { + if (isSparkVersionFile(fileName)) { + fileName.replaceAll(SPARK_VERSION_PREFIX, "") + } else "" + } + + def parseCompressionCodec(fileName: String): String = { + if (isCompressionCodecFile(fileName)) { + fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "") + } else "" + } + + /** + * Parse the event logging information associated with the logs in the given directory. + * + * Specifically, this looks for event log files, the Spark version file, the compression + * codec file (if event logs are compressed), and the application completion file (if the + * application has run to completion). + */ + def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): EventLoggingInfo = { + try { + val fileStatuses = fileSystem.listStatus(logDir) + val filePaths = + if (fileStatuses != null) { + fileStatuses.filter(!_.isDir).map(_.getPath).toSeq + } else { + Seq[Path]() + } + if (filePaths.isEmpty) { + logWarning("No files found in logging directory %s".format(logDir)) + } + EventLoggingInfo( + logPaths = filePaths.filter { path => isEventLogFile(path.getName) }, + sparkVersion = filePaths + .find { path => isSparkVersionFile(path.getName) } + .map { path => parseSparkVersion(path.getName) } + .getOrElse("<Unknown>"), + compressionCodec = filePaths + .find { path => isCompressionCodecFile(path.getName) } + .map { path => + val codec = EventLoggingListener.parseCompressionCodec(path.getName) + val conf = new SparkConf + conf.set("spark.io.compression.codec", codec) + codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf)) + }, + applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) } + ) + } catch { + case t: Throwable => + logError("Exception in parsing logging info from directory %s".format(logDir), t) + EventLoggingInfo.empty + } + } - def stop() = logger.stop() + /** + * Parse the event logging information associated with the logs in the given directory. + */ + def parseLoggingInfo(logDir: String, fileSystem: FileSystem): EventLoggingInfo = { + parseLoggingInfo(new Path(logDir), fileSystem) + } } -// If compression is not enabled, compressionCodec is None -private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String]) + +/** + * Information needed to process the event logs associated with an application. + */ +private[spark] case class EventLoggingInfo( + logPaths: Seq[Path], + sparkVersion: String, + compressionCodec: Option[CompressionCodec], + applicationComplete: Boolean = false) + +private[spark] object EventLoggingInfo { + def empty = EventLoggingInfo(Seq[Path](), "<Unknown>", None, applicationComplete = false) +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index db76178b65501f2b614732fdee1e15a03203518e..b03665fd56d3364b8be2b7040d4b0fb3f6deaf94 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.io.InputStream -import java.net.URI import scala.io.Source @@ -26,63 +25,47 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.hadoop.fs.{Path, FileSystem} import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.Logging import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.JsonProtocol /** - * An EventBus that replays logged events from persisted storage + * A SparkListenerBus that replays logged events from persisted storage. + * + * This class expects files to be appropriately prefixed as specified in EventLoggingListener. + * There exists a one-to-one mapping between ReplayListenerBus and event logging applications. */ -private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus with Logging { - private val compressed = conf.getBoolean("spark.eventLog.compress", false) +private[spark] class ReplayListenerBus( + logPaths: Seq[Path], + fileSystem: FileSystem, + compressionCodec: Option[CompressionCodec]) + extends SparkListenerBus with Logging { - // Only used if compression is enabled - private lazy val compressionCodec = CompressionCodec.createCodec(conf) + private var replayed = false - /** - * Return a list of paths representing log files in the given directory. - */ - private def getLogFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = { - val path = new Path(logDir) - if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) { - logWarning("Log path provided is not a valid directory: %s".format(logDir)) - return Array[Path]() - } - val logStatus = fileSystem.listStatus(path) - if (logStatus == null || !logStatus.exists(!_.isDir)) { - logWarning("Log path provided contains no log files: %s".format(logDir)) - return Array[Path]() - } - logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName) + if (logPaths.length == 0) { + logWarning("Log path provided contains no log files.") } /** * Replay each event in the order maintained in the given logs. + * This should only be called exactly once. */ - def replay(logDir: String): Boolean = { - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val logPaths = getLogFilePaths(logDir, fileSystem) - if (logPaths.length == 0) { - return false - } - + def replay() { + assert(!replayed, "ReplayListenerBus cannot replay events more than once") logPaths.foreach { path => // Keep track of input streams at all levels to close them later // This is necessary because an exception can occur in between stream initializations var fileStream: Option[InputStream] = None var bufferedStream: Option[InputStream] = None var compressStream: Option[InputStream] = None - var currentLine = "" + var currentLine = "<not started>" try { - currentLine = "<not started>" fileStream = Some(fileSystem.open(path)) bufferedStream = Some(new FastBufferedInputStream(fileStream.get)) - compressStream = - if (compressed) { - Some(compressionCodec.compressedInputStream(bufferedStream.get)) - } else bufferedStream + compressStream = Some(wrapForCompression(bufferedStream.get)) - // Parse each line as an event and post it to all attached listeners + // Parse each line as an event and post the event to all attached listeners val lines = Source.fromInputStream(compressStream.get).getLines() lines.foreach { line => currentLine = line @@ -98,7 +81,11 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus compressStream.foreach(_.close()) } } - fileSystem.close() - true + replayed = true + } + + /** If a compression codec is specified, wrap the given stream in a compression stream. */ + private def wrapForCompression(stream: InputStream): InputStream = { + compressionCodec.map(_.compressedInputStream(stream)).getOrElse(stream) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index ced20350d53565fc292849616267ba35de1757a7..378cf1aaebe7b0f701629a2a048b07484663883f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -75,6 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String) + extends SparkListenerEvent + +case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent + /** An event used in the listener to shutdown the listener daemon thread. */ private[spark] case object SparkListenerShutdown extends SparkListenerEvent @@ -141,6 +146,16 @@ trait SparkListener { * Called when an RDD is manually unpersisted by the application */ def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { } + + /** + * Called when the application starts + */ + def onApplicationStart(applicationStart: SparkListenerApplicationStart) { } + + /** + * Called when the application ends + */ + def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 729e1204975719ac1fc1ad74db8ee19e9cd1e253..d6df193d9bcf8885b6ef941e00aa067f54a24794 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -61,6 +61,10 @@ private[spark] trait SparkListenerBus { sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD => sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD)) + case applicationStart: SparkListenerApplicationStart => + sparkListeners.foreach(_.onApplicationStart(applicationStart)) + case applicationEnd: SparkListenerApplicationEnd => + sparkListeners.foreach(_.onApplicationEnd(applicationEnd)) case SparkListenerShutdown => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 25b7472a99cdb6ed60029477acff318fe14b5eec..936e9db80573d81b75b2bc4963b5ae7fe0317844 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLoggingInfo) + sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala index 555486830a7693a42fc1e874bdee647b5b657b58..132502b75f8cd060da712348cd719912756f3e8c 100644 --- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -23,6 +23,6 @@ import java.io.File * References a particular segment of a file (potentially the entire file), * based off an offset and a length. */ -private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) { +private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) { override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f53df7fbedf39e9e6476a759cac510ee07e7fce3..b8e6e15880bf5657eeadd5fd3c5a4469b2f3ac0c 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -34,23 +34,22 @@ private[spark] class SparkUI( val sc: SparkContext, conf: SparkConf, val listenerBus: SparkListenerBus, - val appName: String, + var appName: String, val basePath: String = "") - extends Logging { + extends WebUI("SparkUI") with Logging { def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName) - def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, conf, listenerBus, appName, basePath) + def this(listenerBus: SparkListenerBus, appName: String, basePath: String) = + this(null, new SparkConf, listenerBus, appName, basePath) // If SparkContext is not provided, assume the associated application is not live val live = sc != null val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) - private val bindHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) - private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt - private var serverInfo: Option[ServerInfo] = None + private val localHost = Utils.localHostName() + private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) + private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) private val storage = new BlockManagerUI(this) private val jobs = new JobProgressUI(this) @@ -77,20 +76,10 @@ private[spark] class SparkUI( // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener - /** Bind the HTTP server which backs this web interface */ - def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf)) - logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark JettyUtils", e) - System.exit(1) - } + def setAppName(name: String) { + appName = name } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - /** Initialize all components of the server */ def start() { storage.start() @@ -106,9 +95,21 @@ private[spark] class SparkUI( listenerBus.addListener(exec.listener) } - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!") - serverInfo.get.server.stop() + /** Bind to the HTTP server behind this web interface. */ + override def bind() { + try { + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf)) + logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Spark web UI", e) + System.exit(1) + } + } + + /** Stop the server behind this web interface. Only valid after bind(). */ + override def stop() { + super.stop() logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) } @@ -117,6 +118,6 @@ private[spark] class SparkUI( } private[spark] object SparkUI { - val DEFAULT_PORT = "4040" + val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index a7b872f3445a4fc8d2ec1af3938ef7fef3db005b..2cc7582eca8a3e2fd7684bd9a7bb190050cb36ab 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -20,6 +20,25 @@ package org.apache.spark.ui import java.text.SimpleDateFormat import java.util.Date +private[spark] abstract class WebUI(name: String) { + protected var serverInfo: Option[ServerInfo] = None + + /** + * Bind to the HTTP server behind this web interface. + * Overridden implementation should set serverInfo. + */ + def bind() { } + + /** Return the actual port to which this server is bound. Only valid after bind(). */ + def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + + /** Stop the server behind this web interface. Only valid after bind(). */ + def stop() { + assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name)) + serverInfo.get.server.stop() + } +} + /** * Utilities used throughout the web UI. */ @@ -45,6 +64,6 @@ private[spark] object WebUI { return "%.0f min".format(minutes) } val hours = minutes / 60 - return "%.1f h".format(hours) + "%.1f h".format(hours) } } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala index 23e90c34d5b33429335cf8353a4c79ab99842e7b..33df97187ea78db4016d8c78fef4bbd47f1f38f0 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala @@ -29,10 +29,11 @@ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.Page.Environment private[ui] class EnvironmentUI(parent: SparkUI) { - private val appName = parent.appName private val basePath = parent.basePath private var _listener: Option[EnvironmentListener] = None + private def appName = parent.appName + lazy val listener = _listener.get def start() { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 031ed88a493a8305515e828eccb7f727d43c90e3..77a38a1d3aa7cb44a53f3699e8dd59d1cc0bcc40 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -33,10 +33,11 @@ import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.util.Utils private[ui] class ExecutorsUI(parent: SparkUI) { - private val appName = parent.appName private val basePath = parent.basePath private var _listener: Option[ExecutorsListener] = None + private def appName = parent.appName + lazy val listener = _listener.get def start() { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 70d62b66a482980b29090cd5bc642350621ad5bf..f811aff616bcf7651d5f336860ea3158967eb70c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -27,13 +27,14 @@ import org.apache.spark.ui.UIUtils /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class IndexPage(parent: JobProgressUI) { - private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc private lazy val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler + private def appName = parent.appName + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala index b2c67381cc3daf8208902563ee067ace6a976eb6..ad1a12cdc4e369c37ec6dbeb9bbcff7d83483491 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala @@ -29,7 +29,6 @@ import org.apache.spark.util.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ private[ui] class JobProgressUI(parent: SparkUI) { - val appName = parent.appName val basePath = parent.basePath val live = parent.live val sc = parent.sc @@ -42,6 +41,8 @@ private[ui] class JobProgressUI(parent: SparkUI) { private val poolPage = new PoolPage(this) private var _listener: Option[JobProgressListener] = None + def appName = parent.appName + def start() { val conf = if (live) sc.conf else new SparkConf _listener = Some(new JobProgressListener(conf)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index bd33182b700596b89255e522bf9eb08b1701af9a..3638e6035ba818191201a7dd4f331b9a3503c483 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -27,12 +27,13 @@ import org.apache.spark.ui.UIUtils /** Page showing specific pool details */ private[ui] class PoolPage(parent: JobProgressUI) { - private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc private lazy val listener = parent.listener + private def appName = parent.appName + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val poolName = request.getParameter("poolname") diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0c55f2ee7e944cb294f610d2a1223594dea2062d..0bcbd7461cc5b220492575e96ee6c019a15fc45f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -28,10 +28,11 @@ import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressUI) { - private val appName = parent.appName private val basePath = parent.basePath private lazy val listener = parent.listener + private def appName = parent.appName + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index a7b24ff695214cc1299e053dab10411f088a39ac..16996a2da1e72a4ffc66b2ffe33a12cefd562310 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -30,7 +30,6 @@ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class BlockManagerUI(parent: SparkUI) { - val appName = parent.appName val basePath = parent.basePath private val indexPage = new IndexPage(this) @@ -39,6 +38,8 @@ private[ui] class BlockManagerUI(parent: SparkUI) { lazy val listener = _listener.get + def appName = parent.appName + def start() { _listener = Some(new BlockManagerListener(parent.storageStatusListener)) } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index 0fa461e5e9d27d63134aadf53639f942526b473c..4f6acc30a88c4d5ed3d08cd8067386ee795840d8 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -28,10 +28,11 @@ import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ private[ui] class IndexPage(parent: BlockManagerUI) { - private val appName = parent.appName private val basePath = parent.basePath private lazy val listener = parent.listener + private def appName = parent.appName + def render(request: HttpServletRequest): Seq[Node] = { val rdds = listener.rddInfoList diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 3f42eba4ece0060c56ab56b803e0167fa7c2d1e3..75ee9976d7b5f4d69eb756b4e0eb800444cc00c0 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -28,10 +28,11 @@ import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ private[ui] class RDDPage(parent: BlockManagerUI) { - private val appName = parent.appName private val basePath = parent.basePath private lazy val listener = parent.listener + private def appName = parent.appName + def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index b5f2ec6831d26e2149609ddcaf0b62f8ba59dd5d..0080a8b342b0581b7a9f34381b46bf32bd438820 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -49,7 +49,7 @@ private[spark] class FileLogger( } private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - private var fileIndex = 0 + var fileIndex = 0 // Only used if compression is enabled private lazy val compressionCodec = CompressionCodec.createCodec(conf) @@ -57,10 +57,9 @@ private[spark] class FileLogger( // Only defined if the file system scheme is not local private var hadoopDataStream: Option[FSDataOutputStream] = None - private var writer: Option[PrintWriter] = { - createLogDir() - Some(createWriter()) - } + private var writer: Option[PrintWriter] = None + + createLogDir() /** * Create a logging directory with the given path. @@ -84,8 +83,8 @@ private[spark] class FileLogger( /** * Create a new writer for the file identified by the given path. */ - private def createWriter(): PrintWriter = { - val logPath = logDir + "/" + fileIndex + private def createWriter(fileName: String): PrintWriter = { + val logPath = logDir + "/" + fileName val uri = new URI(logPath) /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). @@ -147,13 +146,17 @@ private[spark] class FileLogger( } /** - * Start a writer for a new file if one does not already exit. + * Start a writer for a new file, closing the existing one if it exists. + * @param fileName Name of the new file, defaulting to the file index if not provided. */ - def start() { - writer.getOrElse { - fileIndex += 1 - writer = Some(createWriter()) + def newFile(fileName: String = "") { + fileIndex += 1 + writer.foreach(_.close()) + val name = fileName match { + case "" => fileIndex.toString + case _ => fileName } + writer = Some(createWriter(name)) } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 19654892bf661966d1041e98d5c2b01d554b6b12..d990fd49ef834af4a451b32a9e728ba19764eacc 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -62,6 +62,10 @@ private[spark] object JsonProtocol { blockManagerRemovedToJson(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => unpersistRDDToJson(unpersistRDD) + case applicationStart: SparkListenerApplicationStart => + applicationStartToJson(applicationStart) + case applicationEnd: SparkListenerApplicationEnd => + applicationEndToJson(applicationEnd) // Not used, but keeps compiler happy case SparkListenerShutdown => JNothing @@ -157,6 +161,18 @@ private[spark] object JsonProtocol { ("RDD ID" -> unpersistRDD.rddId) } + def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { + ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ + ("App Name" -> applicationStart.appName) ~ + ("Timestamp" -> applicationStart.time) ~ + ("User" -> applicationStart.sparkUser) + } + + def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { + ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~ + ("Timestamp" -> applicationEnd.time) + } + /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | @@ -346,6 +362,8 @@ private[spark] object JsonProtocol { val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded) val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved) val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) + val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart) + val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -359,6 +377,8 @@ private[spark] object JsonProtocol { case `blockManagerAdded` => blockManagerAddedFromJson(json) case `blockManagerRemoved` => blockManagerRemovedFromJson(json) case `unpersistRDD` => unpersistRDDFromJson(json) + case `applicationStart` => applicationStartFromJson(json) + case `applicationEnd` => applicationEndFromJson(json) } } @@ -430,6 +450,17 @@ private[spark] object JsonProtocol { SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int]) } + def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = { + val appName = (json \ "App Name").extract[String] + val time = (json \ "Timestamp").extract[Long] + val sparkUser = (json \ "User").extract[String] + SparkListenerApplicationStart(appName, time, sparkUser) + } + + def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { + SparkListenerApplicationEnd((json \ "Timestamp").extract[Long]) + } + /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 59da51f3e0297af479e9eb8ccce3d0647b9def96..166f48ce7342e01a1025f3fc0ed5dac8a6a84a26 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,7 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.SortedSet import scala.io.Source import scala.reflect.ClassTag @@ -1022,4 +1021,11 @@ private[spark] object Utils extends Logging { def getHadoopFileSystem(path: URI): FileSystem = { FileSystem.get(path, SparkHadoopUtil.get.newConfiguration()) } + + /** + * Return a Hadoop FileSystem with the scheme encoded in the given path. + */ + def getHadoopFileSystem(path: String): FileSystem = { + getHadoopFileSystem(new URI(path)) + } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index beac656f573b44d23f88213ea6d9005cf79e2a5e..8c06a2d9aa4ab4e394b585445136608c4a6c28d5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success} +import org.apache.spark.{LocalSparkContext, SparkConf, Success} import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.util.Utils diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 0342a8aff3c286bf244acda7c7f116cbe3f01b25..f75297a02dc8b264142c0981ebd2ffe9d2f86235 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import java.util.{Properties, UUID} +import java.util.Properties import scala.collection.Map @@ -52,6 +52,8 @@ class JsonProtocolSuite extends FunSuite { val blockManagerRemoved = SparkListenerBlockManagerRemoved( BlockManagerId("Scarce", "to be counted...", 100, 200)) val unpersistRdd = SparkListenerUnpersistRDD(12345) + val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") + val applicationEnd = SparkListenerApplicationEnd(42L) testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) @@ -64,6 +66,8 @@ class JsonProtocolSuite extends FunSuite { testEvent(blockManagerAdded, blockManagerAddedJsonString) testEvent(blockManagerRemoved, blockManagerRemovedJsonString) testEvent(unpersistRdd, unpersistRDDJsonString) + testEvent(applicationStart, applicationStartJsonString) + testEvent(applicationEnd, applicationEndJsonString) } test("Dependent Classes") { @@ -208,7 +212,13 @@ class JsonProtocolSuite extends FunSuite { case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => - assert(e1.rddId === e2.rddId) + assert(e1.rddId == e2.rddId) + case (e1: SparkListenerApplicationStart, e2: SparkListenerApplicationStart) => + assert(e1.appName == e2.appName) + assert(e1.time == e2.time) + assert(e1.sparkUser == e2.sparkUser) + case (e1: SparkListenerApplicationEnd, e2: SparkListenerApplicationEnd) => + assert(e1.time == e2.time) case (SparkListenerShutdown, SparkListenerShutdown) => case _ => fail("Events don't match in types!") } @@ -553,4 +563,14 @@ class JsonProtocolSuite extends FunSuite { {"Event":"SparkListenerUnpersistRDD","RDD ID":12345} """ + private val applicationStartJsonString = + """ + {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42, + "User":"Garfield"} + """ + + private val applicationEndJsonString = + """ + {"Event":"SparkListenerApplicationEnd","Timestamp":42} + """ } diff --git a/docs/monitoring.md b/docs/monitoring.md index 15bfb041780da34f776690ea79098c10ab5ff4a4..4c91c3a5929bfad6fc1e2a3d06aaf24591f0d433 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -12,17 +12,77 @@ displays useful information about the application. This includes: * A list of scheduler stages and tasks * A summary of RDD sizes and memory usage -* Information about the running executors * Environmental information. +* Information about the running executors You can access this interface by simply opening `http://<driver-node>:4040` in a web browser. -If multiple SparkContexts are running on the same host, they will bind to succesive ports +If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc). -Spark's Standalone Mode cluster manager also has its own -[web UI](spark-standalone.html#monitoring-and-logging). +Note that this information is only available for the duration of the application by default. +To view the web UI after the fact, set `spark.eventLog.enabled` to true before starting the +application. This configures Spark to log Spark events that encode the information displayed +in the UI to persisted storage. -Note that in both of these UIs, the tables are sortable by clicking their headers, +## Viewing After the Fact + +Spark's Standalone Mode cluster manager also has its own +[web UI](spark-standalone.html#monitoring-and-logging). If an application has logged events over +the course of its lifetime, then the Standalone master's web UI will automatically re-render the +application's UI after the application has finished. + +If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of a finished +application through Spark's history server, provided that the application's event logs exist. +You can start a the history server by executing: + + ./sbin/start-history-server.sh <base-logging-directory> + +The base logging directory must be supplied, and should contain sub-directories that each +represents an application's event logs. This creates a web interface at +`http://<server-url>:18080` by default. The history server depends on the following variables: + +<table class="table"> + <tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr> + <tr> + <td><code>SPARK_DAEMON_MEMORY</code></td> + <td>Memory to allocate to the history server. (default: 512m).</td> + </tr> + <tr> + <td><code>SPARK_DAEMON_JAVA_OPTS</code></td> + <td>JVM options for the history server (default: none).</td> + </tr> +</table> + +Further, the history server can be configured as follows: + +<table class="table"> + <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> + <tr> + <td>spark.history.updateInterval</td> + <td>10</td> + <td> + The period, in seconds, at which information displayed by this history server is updated. + Each update checks for any changes made to the event logs in persisted storage. + </td> + </tr> + <tr> + <td>spark.history.retainedApplications</td> + <td>250</td> + <td> + The number of application UIs to retain. If this cap is exceeded, then the oldest + applications will be removed. + </td> + </tr> + <tr> + <td>spark.history.ui.port</td> + <td>18080</td> + <td> + The port to which the web interface of the history server binds. + </td> + </tr> +</table> + +Note that in all of these UIs, the tables are sortable by clicking their headers, making it easy to identify slow tasks, data skew, etc. # Metrics diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 3ebf288130fb63e1fba921fdb6e8dba063046263..910b31d209e13b978856f7a5cef8583170fa9174 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -116,14 +116,14 @@ trait SparkILoopInit { } } - def initializeSpark() { + def initializeSpark() { intp.beQuietDuring { command(""" @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext(); """) command("import org.apache.spark.SparkContext._") } - echo("Spark context available as sc.") + echo("Spark context available as sc.") } // code to be executed only after the interpreter is initialized diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh new file mode 100755 index 0000000000000000000000000000000000000000..4a90c68763b68cadb22be0f2ecaeee5fdf54f8e1 --- /dev/null +++ b/sbin/start-history-server.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Starts the history server on the machine this script is executed on. +# +# Usage: start-history-server.sh <base-log-dir> [<web-ui-port>] +# Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080 +# + +sbin=`dirname "$0"` +sbin=`cd "$sbin"; pwd` + +if [ $# -lt 1 ]; then + echo "Usage: ./start-history-server.sh <base-log-dir>" + echo "Example: ./start-history-server.sh /tmp/spark-events" + exit +fi + +LOG_DIR=$1 + +"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR" diff --git a/sbin/stop-history-server.sh b/sbin/stop-history-server.sh new file mode 100755 index 0000000000000000000000000000000000000000..c0034ad641cbe1fdf44cb1e5ec0807d8c9103a8f --- /dev/null +++ b/sbin/stop-history-server.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops the history server on the machine this script is executed on. + +sbin=`dirname "$0"` +sbin=`cd "$sbin"; pwd` + +"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.history.HistoryServer 1