diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cd0c218a36fd8562607f3e9dbfb0ddf6175a16d..e231e8369dbacb6fc8bef648af6915c66185bb54 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.executor.TriggerThreadDump
 import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
   FixedLengthBinaryInputFormat}
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       None
     }
   }
+  private[spark] val eventLogCodec: Option[String] = {
+    val compress = conf.getBoolean("spark.eventLog.compress", false)
+    if (compress && isEventLogEnabled) {
+      Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
+    } else {
+      None
+    }
+  }
 
   // Generate the random name for a temp folder in Tachyon
   // Add a timestamp as the suffix here to make it more safe
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index ae55b4ff40b74a5edaab80a7fc93747b1c206a61..3d0d68de8f495d5e9041dc54b812816fd489ce56 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
     val memoryPerSlave: Int,
     val command: Command,
     var appUiUrl: String,
-    val eventLogDir: Option[String] = None)
+    val eventLogDir: Option[String] = None,
+    // short name of compression codec used when writing event logs, if any (e.g. lzf)
+    val eventLogCodec: Option[String] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
       memoryPerSlave: Int = memoryPerSlave,
       command: Command = command,
       appUiUrl: String = appUiUrl,
-      eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
-    new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
+      eventLogDir: Option[String] = eventLogDir,
+      eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
+    new ApplicationDescription(
+      name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5fab1d44025045dc427cc2ff9364490014f6d9a..16d88c17d1a768eef688225ac0806366ab5a03b2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,8 +83,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
   // Constants used to parse Spark 1.0.0 log directories.
   private[history] val LOG_PREFIX = "EVENT_LOG_"
-  private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
-  private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+  private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
+  private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
   private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
 
   /**
@@ -324,7 +324,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
-    val (logInput, sparkVersion) =
+    val logInput =
       if (isLegacyLogDirectory(eventLog)) {
         openLegacyEventLog(logPath)
       } else {
@@ -333,7 +333,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     try {
       val appListener = new ApplicationEventListener
       bus.addListener(appListener)
-      bus.replay(logInput, sparkVersion, logPath.toString)
+      bus.replay(logInput, logPath.toString)
       new FsApplicationHistoryInfo(
         logPath.getName(),
         appListener.appId.getOrElse(logPath.getName()),
@@ -353,30 +353,24 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    * log file (along with other metadata files), which is the case for directories generated by
    * the code in previous releases.
    *
-   * @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
+   * @return input stream that holds one JSON record per line.
    */
-  private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
+  private[history] def openLegacyEventLog(dir: Path): InputStream = {
     val children = fs.listStatus(dir)
     var eventLogPath: Path = null
     var codecName: Option[String] = None
-    var sparkVersion: String = null
 
     children.foreach { child =>
       child.getPath().getName() match {
         case name if name.startsWith(LOG_PREFIX) =>
           eventLogPath = child.getPath()
-
         case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
           codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
-
-        case version if version.startsWith(SPARK_VERSION_PREFIX) =>
-          sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
-
         case _ =>
       }
     }
 
-    if (eventLogPath == null || sparkVersion == null) {
+    if (eventLogPath == null) {
       throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
     }
 
@@ -388,7 +382,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       }
 
     val in = new BufferedInputStream(fs.open(eventLogPath))
-    (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+    codec.map(_.compressedInputStream(in)).getOrElse(in)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 8cc6ec1e8192ca6e1999073925b9c4d3f45193da..148485cc118632bc33c9306d15b25b4767c5fa83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -737,13 +737,13 @@ private[spark] class Master(
     val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
     try {
       val eventLogFile = app.desc.eventLogDir
-        .map { dir => EventLoggingListener.getLogPath(dir, app.id) }
+        .map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) }
         .getOrElse {
           // Event logging is not enabled for this application
           app.desc.appUiUrl = notFoundBasePath
           return false
         }
-        
+
       val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
 
       if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
@@ -756,12 +756,12 @@ private[spark] class Master(
         return false
       }
 
-      val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
+      val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
       val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
         appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
       try {
-        replayBus.replay(logInput, sparkVersion, eventLogFile)
+        replayBus.replay(logInput, eventLogFile)
       } finally {
         logInput.close()
       }
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index f856890d279f43181a5b189a2665197af6dce90d..0709b6d689e868063538ef4ef0d043c8ea46823b 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
-import org.apache.spark.Logging
 
 /**
  * :: DeveloperApi ::
@@ -53,8 +52,12 @@ private[spark] object CompressionCodec {
     "lzf" -> classOf[LZFCompressionCodec].getName,
     "snappy" -> classOf[SnappyCompressionCodec].getName)
 
+  def getCodecName(conf: SparkConf): String = {
+    conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
+  }
+
   def createCodec(conf: SparkConf): CompressionCodec = {
-    createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
+    createCodec(conf, getCodecName(conf))
   }
 
   def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
@@ -71,6 +74,20 @@ private[spark] object CompressionCodec {
       s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
   }
 
+  /**
+   * Return the short version of the given codec name.
+   * If it is already a short name, just return it.
+   */
+  def getShortName(codecName: String): String = {
+    if (shortCompressionCodecNames.contains(codecName)) {
+      codecName
+    } else {
+      shortCompressionCodecNames
+        .collectFirst { case (k, v) if v == codecName => k }
+        .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
+    }
+  }
+
   val FALLBACK_COMPRESSION_CODEC = "lzf"
   val DEFAULT_COMPRESSION_CODEC = "snappy"
   val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 30075c172bdb1d35b955c29700a4484520c42e33..2091a9fe8d0d305e5067e3c42a0c870ebd539572 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -62,6 +62,15 @@ private[spark] class EventLoggingListener(
   private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
   private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
   private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
+  private val compressionCodec =
+    if (shouldCompress) {
+      Some(CompressionCodec.createCodec(sparkConf))
+    } else {
+      None
+    }
+  private val compressionCodecName = compressionCodec.map { c =>
+    CompressionCodec.getShortName(c.getClass.getName)
+  }
 
   // Only defined if the file system scheme is not local
   private var hadoopDataStream: Option[FSDataOutputStream] = None
@@ -80,7 +89,7 @@ private[spark] class EventLoggingListener(
   private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
   // Visible for tests only.
-  private[scheduler] val logPath = getLogPath(logBaseDir, appId)
+  private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
 
   /**
    * Creates the log file in the configured log directory.
@@ -111,19 +120,19 @@ private[spark] class EventLoggingListener(
         hadoopDataStream.get
       }
 
-    val compressionCodec =
-      if (shouldCompress) {
-        Some(CompressionCodec.createCodec(sparkConf))
-      } else {
-        None
-      }
-
-    fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
-    val logStream = initEventLog(new BufferedOutputStream(dstream, outputBufferSize),
-      compressionCodec)
-    writer = Some(new PrintWriter(logStream))
+    try {
+      val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+      val bstream = new BufferedOutputStream(cstream, outputBufferSize)
 
-    logInfo("Logging events to %s".format(logPath))
+      EventLoggingListener.initEventLog(bstream)
+      fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
+      writer = Some(new PrintWriter(bstream))
+      logInfo("Logging events to %s".format(logPath))
+    } catch {
+      case e: Exception =>
+        dstream.close()
+        throw e
+    }
   }
 
   /** Log the event as JSON. */
@@ -201,77 +210,57 @@ private[spark] object EventLoggingListener extends Logging {
   // Suffix applied to the names of files still being written by applications.
   val IN_PROGRESS = ".inprogress"
   val DEFAULT_LOG_DIR = "/tmp/spark-events"
+  val SPARK_VERSION_KEY = "SPARK_VERSION"
+  val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
 
   private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
 
-  // Marker for the end of header data in a log file. After this marker, log data, potentially
-  // compressed, will be found.
-  private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
-
-  // To avoid corrupted files causing the heap to fill up. Value is arbitrary.
-  private val MAX_HEADER_LINE_LENGTH = 4096
-
   // A cache for compression codecs to avoid creating the same codec many times
   private val codecMap = new mutable.HashMap[String, CompressionCodec]
 
   /**
-   * Write metadata about the event log to the given stream.
-   *
-   * The header is a serialized version of a map, except it does not use Java serialization to
-   * avoid incompatibilities between different JDKs. It writes one map entry per line, in
-   * "key=value" format.
+   * Write metadata about an event log to the given stream.
+   * The metadata is encoded in the first line of the event log as JSON.
    *
-   * The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code
-   * can know when to stop.
-   *
-   * The format needs to be kept in sync with the openEventLog() method below. Also, it cannot
-   * change in new Spark versions without some other way of detecting the change (like some
-   * metadata encoded in the file name).
-   *
-   * @param logStream Raw output stream to the even log file.
-   * @param compressionCodec Optional compression codec to use.
-   * @return A stream where to write event log data. This may be a wrapper around the original
-   *         stream (for example, when compression is enabled).
+   * @param logStream Raw output stream to the event log file.
    */
-  def initEventLog(
-      logStream: OutputStream,
-      compressionCodec: Option[CompressionCodec]): OutputStream = {
-    val meta = mutable.HashMap(("version" -> SPARK_VERSION))
-    compressionCodec.foreach { codec =>
-      meta += ("compressionCodec" -> codec.getClass().getName())
-    }
-
-    def write(entry: String) = {
-      val bytes = entry.getBytes(Charsets.UTF_8)
-      if (bytes.length > MAX_HEADER_LINE_LENGTH) {
-        throw new IOException(s"Header entry too long: ${entry}")
-      }
-      logStream.write(bytes, 0, bytes.length)
-    }
-
-    meta.foreach { case (k, v) => write(s"$k=$v\n") }
-    write(s"$HEADER_END_MARKER\n")
-    compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
+  def initEventLog(logStream: OutputStream): Unit = {
+    val metadata = SparkListenerLogStart(SPARK_VERSION)
+    val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
+    logStream.write(metadataJson.getBytes(Charsets.UTF_8))
   }
 
   /**
    * Return a file-system-safe path to the log file for the given application.
    *
+   * Note that because we currently only create a single log file for each application,
+   * we must encode all the information needed to parse this event log in the file name
+   * instead of within the file itself. Otherwise, if the file is compressed, for instance,
+   * we won't know which codec to use to decompress the metadata needed to open the file in
+   * the first place.
+   *
    * @param logBaseDir Directory where the log file will be written.
    * @param appId A unique app ID.
+   * @param compressionCodecName Name to identify the codec used to compress the contents
+   *                             of the log, or None if compression is not enabled.
    * @return A path which consists of file-system-safe characters.
    */
-  def getLogPath(logBaseDir: String, appId: String): String = {
-    val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
-    Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+  def getLogPath(
+      logBaseDir: String,
+      appId: String,
+      compressionCodecName: Option[String] = None): String = {
+    val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
+    // e.g. app_123, app_123.lzf
+    val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
+    Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
   }
 
   /**
-   * Opens an event log file and returns an input stream to the event data.
+   * Opens an event log file and returns an input stream that contains the event data.
    *
-   * @return 2-tuple (event input stream, Spark version of event data)
+   * @return input stream that holds one JSON record per line.
    */
-  def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
+  def openEventLog(log: Path, fs: FileSystem): InputStream = {
     // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
     // IOException when a file does not exist, so try our best to throw a proper exception.
     if (!fs.exists(log)) {
@@ -279,52 +268,17 @@ private[spark] object EventLoggingListener extends Logging {
     }
 
     val in = new BufferedInputStream(fs.open(log))
-    // Read a single line from the input stream without buffering.
-    // We cannot use BufferedReader because we must avoid reading
-    // beyond the end of the header, after which the content of the
-    // file may be compressed.
-    def readLine(): String = {
-      val bytes = new ByteArrayOutputStream()
-      var next = in.read()
-      var count = 0
-      while (next != '\n') {
-        if (next == -1) {
-          throw new IOException("Unexpected end of file.")
-        }
-        bytes.write(next)
-        count = count + 1
-        if (count > MAX_HEADER_LINE_LENGTH) {
-          throw new IOException("Maximum header line length exceeded.")
-        }
-        next = in.read()
-      }
-      new String(bytes.toByteArray(), Charsets.UTF_8)
+
+    // Compression codec is encoded as an extension, e.g. app_123.lzf
+    // Since we sanitize the app ID to not include periods, it is safe to split on it
+    val logName = log.getName.stripSuffix(IN_PROGRESS)
+    val codecName: Option[String] = logName.split("\\.").tail.lastOption
+    val codec = codecName.map { c =>
+      codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
     }
 
-    // Parse the header metadata in the form of k=v pairs
-    // This assumes that every line before the header end marker follows this format
     try {
-      val meta = new mutable.HashMap[String, String]()
-      var foundEndMarker = false
-      while (!foundEndMarker) {
-        readLine() match {
-          case HEADER_END_MARKER =>
-            foundEndMarker = true
-          case entry =>
-            val prop = entry.split("=", 2)
-            if (prop.length != 2) {
-              throw new IllegalArgumentException("Invalid metadata in log file.")
-            }
-            meta += (prop(0) -> prop(1))
-        }
-      }
-
-      val sparkVersion = meta.get("version").getOrElse(
-        throw new IllegalArgumentException("Missing Spark version in log metadata."))
-      val codec = meta.get("compressionCodec").map { codecName =>
-        codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new SparkConf, codecName))
-      }
-      (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+      codec.map(_.compressedInputStream(in)).getOrElse(in)
     } catch {
       case e: Exception =>
         in.close()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d9c3a10dc54130ba4d4cacec5e3e904230cfb74a..95273c716b3e26c4e0f0a90e363c4bdc196df123 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -39,10 +39,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
    * error is thrown by this method.
    *
    * @param logData Stream containing event log data.
-   * @param version Spark version that generated the events.
    * @param sourceName Filename (or other source identifier) from whence @logData is being read
    */
-  def replay(logData: InputStream, version: String, sourceName: String) {
+  def replay(logData: InputStream, sourceName: String): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 1
     try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index dd28ddb31de1f8676559bcb5d11b9d1fb21dcc24..52720d48ca67fd72307cab22c6e0b522844d627e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,6 +116,11 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String],
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 
+/**
+ * An internal class that describes the metadata of an event log.
+ * This event is not meant to be posted to listeners downstream.
+ */
+private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
 
 /**
  * :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fe8a19a2c0cb94115f99a9a44ac110daebf55700..61e69ecc083871af0a1e18a77bed980fb505261a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
         listener.onExecutorAdded(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>
         listener.onExecutorRemoved(executorRemoved)
+      case logStart: SparkListenerLogStart => // ignore event log metadata
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a0aa555f6244fbbab41ce2cdb0b4e1e9dc6a3f0d..ffd482570575553a5ebfaf4adf70a767eb7a7a39 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend(
       args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
     val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
-      appUIAddress, sc.eventLogDir)
+      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e20864db5673454bfae6cbe0da601a4e162a305..474f79fb756f664fdfc3a4b0a4e0c354be760957 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -89,6 +89,8 @@ private[spark] object JsonProtocol {
         executorAddedToJson(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>
         executorRemovedToJson(executorRemoved)
+      case logStart: SparkListenerLogStart =>
+        logStartToJson(logStart)
       // These aren't used, but keeps compiler happy
       case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
     }
@@ -214,6 +216,11 @@ private[spark] object JsonProtocol {
     ("Removed Reason" -> executorRemoved.reason)
   }
 
+  def logStartToJson(logStart: SparkListenerLogStart): JValue = {
+    ("Event" -> Utils.getFormattedClassName(logStart)) ~
+    ("Spark Version" -> SPARK_VERSION)
+  }
+
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
    * -------------------------------------------------------------------- */
@@ -447,6 +454,7 @@ private[spark] object JsonProtocol {
     val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
     val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
+    val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -464,6 +472,7 @@ private[spark] object JsonProtocol {
       case `applicationEnd` => applicationEndFromJson(json)
       case `executorAdded` => executorAddedFromJson(json)
       case `executorRemoved` => executorRemovedFromJson(json)
+      case `logStart` => logStartFromJson(json)
     }
   }
 
@@ -574,6 +583,11 @@ private[spark] object JsonProtocol {
     SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
+  def logStartFromJson(json: JValue): SparkListenerLogStart = {
+    val sparkVersion = (json \ "Spark Version").extract[String]
+    SparkListenerLogStart(sparkVersion)
+  }
+
   /** --------------------------------------------------------------------- *
    * JSON deserialization methods for classes SparkListenerEvents depend on |
    * ---------------------------------------------------------------------- */
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 85939eaadccc7477c9b62393bab979b52c759337..e908ba604ebedbf6bf9bfc348908743aff09a4d5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
+import java.net.URI
 
 import scala.io.Source
 
-import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.Matchers
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
 import org.apache.spark.util.{JsonProtocol, Utils}
@@ -45,18 +44,35 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     Utils.deleteRecursively(testDir)
   }
 
+  /** Create a fake log file using the new log format used in Spark 1.3+ */
+  private def newLogFile(
+      appId: String,
+      inProgress: Boolean,
+      codec: Option[String] = None): File = {
+    val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
+    val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
+    val logPath = new URI(logUri).getPath + ip
+    new File(logPath)
+  }
+
   test("Parse new and old application logs") {
     val provider = new FsHistoryProvider(createTestConf())
 
     // Write a new-style application log.
-    val newAppComplete = new File(testDir, "new1")
+    val newAppComplete = newLogFile("new1", inProgress = false)
     writeFile(newAppComplete, true, None,
       SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
-      SparkListenerApplicationEnd(4L)
+      SparkListenerApplicationEnd(5L)
       )
 
+    // Write a new-style application log.
+    val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+    writeFile(newAppCompressedComplete, true, None,
+      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+      SparkListenerApplicationEnd(4L))
+
     // Write an unfinished app, new-style.
-    val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
+    val newAppIncomplete = newLogFile("new2", inProgress = true)
     writeFile(newAppIncomplete, true, None,
       SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
       )
@@ -89,16 +105,18 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
 
     val list = provider.getListing().toSeq
     list should not be (null)
-    list.size should be (4)
-    list.count(e => e.completed) should be (2)
+    list.size should be (5)
+    list.count(_.completed) should be (3)
 
-    list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
+    list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
       newAppComplete.lastModified(), "test", true))
-    list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+    list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
+      "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
+    list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
       oldAppComplete.lastModified(), "test", true))
-    list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
+    list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
       -1L, oldAppIncomplete.lastModified(), "test", false))
-    list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
+    list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
       -1L, newAppIncomplete.lastModified(), "test", false))
 
     // Make sure the UI can be rendered.
@@ -127,7 +145,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
 
       val logPath = new Path(logDir.getAbsolutePath())
       try {
-        val (logInput, sparkVersion) = provider.openLegacyEventLog(logPath)
+        val logInput = provider.openLegacyEventLog(logPath)
         try {
           Source.fromInputStream(logInput).getLines().toSeq.size should be (2)
         } finally {
@@ -141,12 +159,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   }
 
   test("SPARK-3697: ignore directories that cannot be read.") {
-    val logFile1 = new File(testDir, "new1")
+    val logFile1 = newLogFile("new1", inProgress = false)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1-1", None, 1L, "test"),
       SparkListenerApplicationEnd(2L)
       )
-    val logFile2 = new File(testDir, "new2")
+    val logFile2 = newLogFile("new2", inProgress = false)
     writeFile(logFile2, true, None,
       SparkListenerApplicationStart("app1-2", None, 1L, "test"),
       SparkListenerApplicationEnd(2L)
@@ -164,7 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   test("history file is renamed from inprogress to completed") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+    val logFile1 = newLogFile("app1", inProgress = true)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
       SparkListenerApplicationEnd(2L)
@@ -174,7 +192,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     appListBeforeRename.size should be (1)
     appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
 
-    logFile1.renameTo(new File(testDir, "app1"))
+    logFile1.renameTo(newLogFile("app1", inProgress = false))
     provider.checkForLogs()
     val appListAfterRename = provider.getListing()
     appListAfterRename.size should be (1)
@@ -184,7 +202,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   test("SPARK-5582: empty log directory") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+    val logFile1 = newLogFile("app1", inProgress = true)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
       SparkListenerApplicationEnd(2L))
@@ -199,14 +217,13 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
 
   private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
     events: SparkListenerEvent*) = {
-    val out =
-      if (isNewFormat) {
-        EventLoggingListener.initEventLog(new FileOutputStream(file), codec)
-      } else {
-        val fileStream = new FileOutputStream(file)
-        codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream)
-      }
-    val writer = new OutputStreamWriter(out, "UTF-8")
+    val fstream = new FileOutputStream(file)
+    val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+    val bstream = new BufferedOutputStream(cstream)
+    if (isNewFormat) {
+      EventLoggingListener.initEventLog(new FileOutputStream(file))
+    }
+    val writer = new OutputStreamWriter(bstream, "UTF-8")
     try {
       events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 437d8693c0b1f2925ac481052aa366372a6f4e70..992dde66f982fd20e334544c0d69694e7cc9b9dd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.io.{File, FileOutputStream, InputStream, IOException}
+import java.net.URI
 
 import scala.collection.mutable
 import scala.io.Source
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io._
 import org.apache.spark.util.{JsonProtocol, Utils}
@@ -78,7 +79,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
 
   test("Basic event logging with compression") {
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
-      testEventLogging(compressionCodec = Some(codec))
+      testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
     }
   }
 
@@ -88,25 +89,35 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
 
   test("End-to-end event logging with compression") {
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
-      testApplicationEventLogging(compressionCodec = Some(codec))
+      testApplicationEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
     }
   }
 
   test("Log overwriting") {
-    val log = new FileOutputStream(new File(testDir, "test"))
-    log.close()
-    try {
-      testEventLogging()
-      assert(false)
-    } catch {
-      case e: IOException =>
-        // Expected, since we haven't enabled log overwrite.
-    }
-
+    val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
+    val logPath = new URI(logUri).getPath
+    // Create file before writing the event log
+    new FileOutputStream(new File(logPath)).close()
+    // Expected IOException, since we haven't enabled log overwrite.
+    intercept[IOException] { testEventLogging() }
     // Try again, but enable overwriting.
     testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true"))
   }
 
+  test("Event log name") {
+    // without compression
+    assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
+    // with compression
+    assert(s"file:/base-dir/app1.lzf" ===
+      EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
+    // illegal characters in app ID
+    assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
+      EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
+    // illegal characters in app ID with compression
+    assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
+      EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
+  }
+
   /* ----------------- *
    * Actual test logic *
    * ----------------- */
@@ -140,15 +151,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
     eventLogger.stop()
 
     // Verify file contains exactly the two events logged
-    val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
-      fileSystem)
+    val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
     try {
       val lines = readLines(logData)
-      assert(lines.size === 2)
-      assert(lines(0).contains("SparkListenerApplicationStart"))
-      assert(lines(1).contains("SparkListenerApplicationEnd"))
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      assert(lines.size === 3)
+      assert(lines(0).contains("SparkListenerLogStart"))
+      assert(lines(1).contains("SparkListenerApplicationStart"))
+      assert(lines(2).contains("SparkListenerApplicationEnd"))
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart)
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd)
     } finally {
       logData.close()
     }
@@ -163,8 +176,10 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
     val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
     assert(sc.eventLogger.isDefined)
     val eventLogger = sc.eventLogger.get
+    val eventLogPath = eventLogger.logPath
     val expectedLogDir = testDir.toURI().toString()
-    assert(eventLogger.logPath.startsWith(expectedLogDir + "/"))
+    assert(eventLogPath === EventLoggingListener.getLogPath(
+      expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
 
     // Begin listening for events that trigger asserts
     val eventExistenceListener = new EventExistenceListener(eventLogger)
@@ -178,8 +193,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
     eventExistenceListener.assertAllCallbacksInvoked()
 
     // Make sure expected events exist in the log file.
-    val (logData, version) = EventLoggingListener.openEventLog(new Path(eventLogger.logPath),
-      fileSystem)
+    val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
+    val logStart = SparkListenerLogStart(SPARK_VERSION)
     val lines = readLines(logData)
     val eventSet = mutable.Set(
       SparkListenerApplicationStart,
@@ -204,6 +219,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
         }
       }
     }
+    assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
     assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 702c4cb3bdef902cb5b4b5db0815bbc495afc6b5..601694f57aad0f5098c0f0269cdccdadcb94bcef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
-      replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
+      replayer.replay(logData, logFilePath.toString)
     } finally {
       logData.close()
     }
@@ -115,12 +115,12 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
     assert(!eventLog.isDir)
 
     // Replay events
-    val (logData, version) = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
+    val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
     val eventMonster = new EventMonster(conf)
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
-      replayer.replay(logData, version, eventLog.getPath().toString)
+      replayer.replay(logData, eventLog.getPath().toString)
     } finally {
       logData.close()
     }
@@ -150,11 +150,4 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
     override def start() { }
 
   }
-
-  private def getCompressionCodec(codecName: String) = {
-    val conf = new SparkConf
-    conf.set("spark.io.compression.codec", codecName)
-    CompressionCodec.createCodec(conf)
-  }
-
 }