Skip to content
Snippets Groups Projects
Commit a7c305b8 authored by Andrew Or's avatar Andrew Or Committed by Patrick Wendell
Browse files

[SPARK-2340] Resolve event logging and History Server paths properly

We resolve relative paths to the local `file:/` system for `--jars` and `--files` in spark submit (#853). We should do the same for the history server.

Author: Andrew Or <andrewor14@gmail.com>

Closes #1280 from andrewor14/hist-serv-fix and squashes the following commits:

13ff406 [Andrew Or] Merge branch 'master' of github.com:apache/spark into hist-serv-fix
b393e17 [Andrew Or] Strip trailing "/" from logging directory
622a471 [Andrew Or] Fix test in EventLoggingListenerSuite
0e20f71 [Andrew Or] Shift responsibility of resolving paths up one level
b037c0c [Andrew Or] Use resolved paths for everything in history server
c7e36ee [Andrew Or] Resolve paths for event logging too
40e3933 [Andrew Or] Resolve history server file paths
parent 118c1c42
No related branches found
No related tags found
No related merge requests found
......@@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
conf.getInt("spark.history.updateInterval", 10)) * 1000
private val logDir = conf.get("spark.history.fs.logDirectory", null)
if (logDir == null) {
throw new IllegalArgumentException("Logging directory must be specified.")
}
private val resolvedLogDir = Option(logDir)
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
private val fs = Utils.getHadoopFileSystem(logDir)
private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
......@@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def initialize() {
// Validate the log directory.
val path = new Path(logDir)
val path = new Path(resolvedLogDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(logDir))
"Logging directory specified does not exist: %s".format(resolvedLogDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
"Logging directory specified is not a directory: %s".format(resolvedLogDir))
}
checkForLogs()
......@@ -95,15 +95,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
override def getAppUI(appId: String): SparkUI = {
try {
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
loadAppInfo(appLogDir, true)._2
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
} catch {
case e: FileNotFoundException => null
}
}
override def getConfig(): Map[String, String] =
Map(("Event Log Location" -> logDir))
Map("Event Log Location" -> resolvedLogDir.toString)
/**
* Builds the application list based on the current contents of the log directory.
......@@ -114,14 +115,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
lastLogCheckTimeMs = getMonotonicTimeMs()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(logDir))
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter {
dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}
val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => (app.id -> app)):_*)
appList.map(app => app.id -> app):_*)
// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
......@@ -131,7 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
try {
newApps += loadAppInfo(dir, false)._1
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
}
......@@ -159,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
val path = logDir.getPath
val appId = path.getName
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
......
......@@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
{ providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
if (allApps.size > 0) {
......
......@@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.SignalLogger
/**
* A web server that renders SparkUIs of completed applications.
......@@ -177,7 +177,7 @@ object HistoryServer extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
val args = new HistoryServerArguments(conf, argStrings)
new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
val providerName = conf.getOption("spark.history.provider")
......
......@@ -18,7 +18,6 @@
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
......@@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
parse(tail)
case ("--help" | "-h") :: tail =>
......@@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case _ =>
printUsageAndExit(1)
}
if (logDir != null) {
conf.set("spark.history.fs.logDirectory", logDir)
}
}
private def printUsageAndExit(exitCode: Int) {
......
......@@ -29,7 +29,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol}
import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
/**
* A SparkListener that logs events to persistent storage.
......@@ -55,7 +55,7 @@ private[spark] class EventLoggingListener(
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
......@@ -215,7 +215,7 @@ private[spark] object EventLoggingListener extends Logging {
} catch {
case e: Exception =>
logError("Exception in parsing logging info from directory %s".format(logDir), e)
EventLoggingInfo.empty
EventLoggingInfo.empty
}
}
......
......@@ -52,7 +52,7 @@ private[spark] class FileLogger(
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
private val fileSystem = Utils.getHadoopFileSystem(logDir)
var fileIndex = 0
// Only used if compression is enabled
......
......@@ -259,7 +259,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
val expectedLogDir = logDirPath.toString
assert(eventLogger.logDir.startsWith(expectedLogDir))
assert(eventLogger.logDir.contains(expectedLogDir))
// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment