diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2d1609b973607336ae77527bf1ff3c9c1135d629..82a54dbfb53300b13be943fa9ea6b60c0eae29d6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -29,22 +29,27 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils +/** + * A class that provides application history from event logs stored in the file system. + * This provider checks for new finished applications in the background periodically and + * renders the history application UI by parsing the associated event logs. + */ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider with Logging { + import FsHistoryProvider._ + private val NOT_STARTED = "<Not Started>" // Interval between each check for event log updates private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", conf.getInt("spark.history.updateInterval", 10)) * 1000 - private val logDir = conf.get("spark.history.fs.logDirectory", null) - private val resolvedLogDir = Option(logDir) - .map { d => Utils.resolveURI(d) } - .getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") } + private val logDir = conf.getOption("spark.history.fs.logDirectory") + .map { d => Utils.resolveURI(d).toString } + .getOrElse(DEFAULT_LOG_DIR) - private val fs = Utils.getHadoopFileSystem(resolvedLogDir, - SparkHadoopUtil.get.newConfiguration(conf)) + private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTimeMs = -1L @@ -87,14 +92,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def initialize() { // Validate the log directory. - val path = new Path(resolvedLogDir) + val path = new Path(logDir) if (!fs.exists(path)) { - throw new IllegalArgumentException( - "Logging directory specified does not exist: %s".format(resolvedLogDir)) + var msg = s"Log directory specified does not exist: $logDir." + if (logDir == DEFAULT_LOG_DIR) { + msg += " Did you configure the correct one through spark.fs.history.logDirectory?" + } + throw new IllegalArgumentException(msg) } if (!fs.getFileStatus(path).isDir) { throw new IllegalArgumentException( - "Logging directory specified is not a directory: %s".format(resolvedLogDir)) + "Logging directory specified is not a directory: %s".format(logDir)) } checkForLogs() @@ -134,8 +142,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - override def getConfig(): Map[String, String] = - Map("Event Log Location" -> resolvedLogDir.toString) + override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString) /** * Builds the application list based on the current contents of the log directory. @@ -146,7 +153,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis lastLogCheckTimeMs = getMonotonicTimeMs() logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { - val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logStatus = fs.listStatus(new Path(logDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() // Load all new logs from the log directory. Only directories that have a modification time @@ -244,6 +251,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } +private object FsHistoryProvider { + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" +} + private class FsApplicationHistoryInfo( val logDir: String, id: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 0e249e51a77d8d47969512f8e845d0b4289a33da..5fdc350cd851210eabb4024d27593937b0e4cdc7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -58,7 +58,13 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { </h4> ++ appTable } else { - <h4>No Completed Applications Found</h4> + <h4>No completed applications found!</h4> ++ + <p>Did you specify the correct logging directory? + Please verify your setting of <span style="font-style:italic"> + spark.history.fs.logDirectory</span> and whether you have the permissions to + access it.<br /> It is also possible that your application did not run to + completion or did not stop the SparkContext. + </p> } } </div> diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 5bce32a04d16dfd88b6d97ca7d4ebd5fa98a4d19..b1270ade9f7507884f26c288b1c4c2ac373cd393 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -17,14 +17,13 @@ package org.apache.spark.deploy.history -import org.apache.spark.SparkConf +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils /** * Command-line parser for the master. */ -private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) { - private var logDir: String = null +private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) extends Logging { private var propertiesFile: String = null parse(args.toList) @@ -32,7 +31,8 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] private def parse(args: List[String]): Unit = { args match { case ("--dir" | "-d") :: value :: tail => - logDir = value + logWarning("Setting log directory through the command line is deprecated as of " + + "Spark 1.1.0. Please set this through spark.history.fs.logDirectory instead.") conf.set("spark.history.fs.logDirectory", value) System.setProperty("spark.history.fs.logDirectory", value) parse(tail) @@ -78,9 +78,10 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] | (default 50) |FsHistoryProvider options: | - | spark.history.fs.logDirectory Directory where app logs are stored (required) - | spark.history.fs.updateInterval How often to reload log data from storage (in seconds, - | default 10) + | spark.history.fs.logDirectory Directory where app logs are stored + | (default: file:/tmp/spark-events) + | spark.history.fs.updateInterval How often to reload log data from storage + | (in seconds, default: 10) |""".stripMargin) System.exit(exitCode) } diff --git a/docs/monitoring.md b/docs/monitoring.md index e3f81a76acdbbee8ce5376b29fc55ffde316148b..f32cdef240d313a2b3d4da51f72dc4d05e6c7de5 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -79,7 +79,7 @@ follows: </tr> <tr> <td>spark.history.fs.logDirectory</td> - <td>(none)</td> + <td>file:/tmp/spark-events</td> <td> Directory that contains application event logs to be loaded by the history server </td>