diff --git a/.rat-excludes b/.rat-excludes index 874a6ee9f4043147502f5baae5cf17ea447cbf2e..8b5061415ff4c669655feba9f83c2688081acf32 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -73,12 +73,12 @@ logs .*dependency-reduced-pom.xml known_translations json_expectation -local-1422981759269/* -local-1422981780767/* -local-1425081759269/* -local-1426533911241/* -local-1426633911242/* -local-1430917381534/* +local-1422981759269 +local-1422981780767 +local-1425081759269 +local-1426533911241 +local-1426633911242 +local-1430917381534 local-1430917381535_1 local-1430917381535_2 DESCRIPTION diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 22e4155cc545281ed014104bfdc034e0c540ba77..9648959dbacb9a699c7e1d2b8d1bddd7b39fbe07 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -248,9 +248,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val logInfos: Seq[FileStatus] = statusList .filter { entry => try { - getModificationTime(entry).map { time => - time >= lastScanTime - }.getOrElse(false) + !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime) } catch { case e: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on @@ -261,9 +259,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } .flatMap { entry => Some(entry) } .sortWith { case (entry1, entry2) => - val mod1 = getModificationTime(entry1).getOrElse(-1L) - val mod2 = getModificationTime(entry2).getOrElse(-1L) - mod1 >= mod2 + entry1.getModificationTime() >= entry2.getModificationTime() } logInfos.grouped(20) @@ -341,19 +337,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get }.foreach { attempt => val logPath = new Path(logDir, attempt.logPath) - // If this is a legacy directory, then add the directory to the zipStream and add - // each file to that directory. - if (isLegacyLogDirectory(fs.getFileStatus(logPath))) { - val files = fs.listStatus(logPath) - zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/")) - zipStream.closeEntry() - files.foreach { file => - val path = file.getPath - zipFileToStream(path, attempt.logPath + Path.SEPARATOR + path.getName, zipStream) - } - } else { - zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream) - } + zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream) } } finally { zipStream.close() @@ -527,12 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") - val logInput = - if (isLegacyLogDirectory(eventLog)) { - openLegacyEventLog(logPath) - } else { - EventLoggingListener.openEventLog(logPath, fs) - } + val logInput = EventLoggingListener.openEventLog(logPath, fs) try { val appListener = new ApplicationEventListener val appCompleted = isApplicationCompleted(eventLog) @@ -540,9 +519,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus.replay(logInput, logPath.toString, !appCompleted) // Without an app ID, new logs will render incorrectly in the listing page, so do not list or - // try to show their UI. Some old versions of Spark generate logs without an app ID, so let - // logs generated by those versions go through. - if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) { + // try to show their UI. + if (appListener.appId.isDefined) { Some(new FsApplicationAttemptInfo( logPath.getName(), appListener.appName.getOrElse(NOT_STARTED), @@ -550,7 +528,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.appAttemptId, appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), - getModificationTime(eventLog).get, + eventLog.getModificationTime(), appListener.sparkUser.getOrElse(NOT_STARTED), appCompleted)) } else { @@ -561,91 +539,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - /** - * Loads a legacy log directory. This assumes that the log directory contains a single event - * log file (along with other metadata files), which is the case for directories generated by - * the code in previous releases. - * - * @return input stream that holds one JSON record per line. - */ - private[history] def openLegacyEventLog(dir: Path): InputStream = { - val children = fs.listStatus(dir) - var eventLogPath: Path = null - var codecName: Option[String] = None - - children.foreach { child => - child.getPath().getName() match { - case name if name.startsWith(LOG_PREFIX) => - eventLogPath = child.getPath() - case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) => - codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length())) - case _ => - } - } - - if (eventLogPath == null) { - throw new IllegalArgumentException(s"$dir is not a Spark application log directory.") - } - - val codec = try { - codecName.map { c => CompressionCodec.createCodec(conf, c) } - } catch { - case e: Exception => - throw new IllegalArgumentException(s"Unknown compression codec $codecName.") - } - - val in = new BufferedInputStream(fs.open(eventLogPath)) - codec.map(_.compressedInputStream(in)).getOrElse(in) - } - - /** - * Return whether the specified event log path contains a old directory-based event log. - * Previously, the event log of an application comprises of multiple files in a directory. - * As of Spark 1.3, these files are consolidated into a single one that replaces the directory. - * See SPARK-2261 for more detail. - */ - private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory - - /** - * Returns the modification time of the given event log. If the status points at an empty - * directory, `None` is returned, indicating that there isn't an event log at that location. - */ - private def getModificationTime(fsEntry: FileStatus): Option[Long] = { - if (isLegacyLogDirectory(fsEntry)) { - val statusList = fs.listStatus(fsEntry.getPath) - if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None - } else { - Some(fsEntry.getModificationTime()) - } - } - /** * Return true when the application has completed. */ private def isApplicationCompleted(entry: FileStatus): Boolean = { - if (isLegacyLogDirectory(entry)) { - fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) - } else { - !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) - } - } - - /** - * Returns whether the version of Spark that generated logs records app IDs. App IDs were added - * in Spark 1.1. - */ - private def sparkVersionHasAppId(entry: FileStatus): Boolean = { - if (isLegacyLogDirectory(entry)) { - fs.listStatus(entry.getPath()) - .find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) } - .map { status => - val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length()) - version != "1.0" && version != "1.1" - } - .getOrElse(true) - } else { - true - } + !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) } /** @@ -670,12 +568,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" - - // Constants used to parse Spark 1.0.0 log directories. - val LOG_PREFIX = "EVENT_LOG_" - val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" - val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_" - val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" } private class FsApplicationAttemptInfo( diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 36f2b74f948f18eed3bd8f3dea6d9c87053a9148..01fee46e73a809cf96121a890126d0cd87735e44 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -232,8 +232,6 @@ private[spark] object EventLoggingListener extends Logging { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" val DEFAULT_LOG_DIR = "/tmp/spark-events" - val SPARK_VERSION_KEY = "SPARK_VERSION" - val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC" private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) diff --git a/core/src/test/resources/spark-events/local-1422981759269/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1422981759269 similarity index 100% rename from core/src/test/resources/spark-events/local-1422981759269/EVENT_LOG_1 rename to core/src/test/resources/spark-events/local-1422981759269 diff --git a/core/src/test/resources/spark-events/local-1422981759269/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1422981759269/APPLICATION_COMPLETE deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1422981759269/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1422981759269/SPARK_VERSION_1.2.0 deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1422981780767/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1422981780767 similarity index 100% rename from core/src/test/resources/spark-events/local-1422981780767/EVENT_LOG_1 rename to core/src/test/resources/spark-events/local-1422981780767 diff --git a/core/src/test/resources/spark-events/local-1422981780767/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1422981780767/APPLICATION_COMPLETE deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1422981780767/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1422981780767/SPARK_VERSION_1.2.0 deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1425081759269/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1425081759269 similarity index 100% rename from core/src/test/resources/spark-events/local-1425081759269/EVENT_LOG_1 rename to core/src/test/resources/spark-events/local-1425081759269 diff --git a/core/src/test/resources/spark-events/local-1425081759269/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1425081759269/APPLICATION_COMPLETE deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1425081759269/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1425081759269/SPARK_VERSION_1.2.0 deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1426533911241/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1426533911241 similarity index 100% rename from core/src/test/resources/spark-events/local-1426533911241/EVENT_LOG_1 rename to core/src/test/resources/spark-events/local-1426533911241 diff --git a/core/src/test/resources/spark-events/local-1426533911241/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1426533911241/APPLICATION_COMPLETE deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1426533911241/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1426533911241/SPARK_VERSION_1.2.0 deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1426633911242/EVENT_LOG_1 b/core/src/test/resources/spark-events/local-1426633911242 similarity index 100% rename from core/src/test/resources/spark-events/local-1426633911242/EVENT_LOG_1 rename to core/src/test/resources/spark-events/local-1426633911242 diff --git a/core/src/test/resources/spark-events/local-1426633911242/APPLICATION_COMPLETE b/core/src/test/resources/spark-events/local-1426633911242/APPLICATION_COMPLETE deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/resources/spark-events/local-1426633911242/SPARK_VERSION_1.2.0 b/core/src/test/resources/spark-events/local-1426633911242/SPARK_VERSION_1.2.0 deleted file mode 100755 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 6cbf911395a84f63b5420ee8aacf060801820c10..3baa2e2ddad31d5da563f39e26ba6448321c6b38 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -69,7 +69,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new File(logPath) } - test("Parse new and old application logs") { + test("Parse application logs") { val provider = new FsHistoryProvider(createTestConf()) // Write a new-style application log. @@ -95,26 +95,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc None) ) - // Write an old-style application log. - val oldAppComplete = writeOldLog("old1", "1.0", None, true, - SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None), - SparkListenerApplicationEnd(3L) - ) - - // Check for logs so that we force the older unfinished app to be loaded, to make - // sure unfinished apps are also sorted correctly. - provider.checkForLogs() - - // Write an unfinished app, old-style. - val oldAppIncomplete = writeOldLog("old2", "1.0", None, false, - SparkListenerApplicationStart("old2", None, 2L, "test", None) - ) - - // Force a reload of data from the log directory, and check that both logs are loaded. + // Force a reload of data from the log directory, and check that logs are loaded. // Take the opportunity to check that the offset checks work as expected. updateAndCheck(provider) { list => - list.size should be (5) - list.count(_.attempts.head.completed) should be (3) + list.size should be (3) + list.count(_.attempts.head.completed) should be (2) def makeAppInfo( id: String, @@ -132,11 +117,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc newAppComplete.lastModified(), "test", true)) list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(), 1L, 4L, newAppCompressedComplete.lastModified(), "test", true)) - list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L, - oldAppComplete.lastModified(), "test", true)) - list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L, - -1L, oldAppIncomplete.lastModified(), "test", false)) - list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L, + list(2) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L, newAppIncomplete.lastModified(), "test", false)) // Make sure the UI can be rendered. @@ -148,38 +129,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("Parse legacy logs with compression codec set") { - val provider = new FsHistoryProvider(createTestConf()) - val testCodecs = List((classOf[LZFCompressionCodec].getName(), true), - (classOf[SnappyCompressionCodec].getName(), true), - ("invalid.codec", false)) - - testCodecs.foreach { case (codecName, valid) => - val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null - val logDir = new File(testDir, codecName) - logDir.mkdir() - createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0")) - writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec), - SparkListenerApplicationStart("app2", None, 2L, "test", None), - SparkListenerApplicationEnd(3L) - ) - createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName)) - - val logPath = new Path(logDir.getAbsolutePath()) - try { - val logInput = provider.openLegacyEventLog(logPath) - try { - Source.fromInputStream(logInput).getLines().toSeq.size should be (2) - } finally { - logInput.close() - } - } catch { - case e: IllegalArgumentException => - valid should be (false) - } - } - } - test("SPARK-3697: ignore directories that cannot be read.") { val logFile1 = newLogFile("new1", None, inProgress = false) writeFile(logFile1, true, None, @@ -395,21 +344,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc SparkListenerLogStart("1.4") ) - // Write a 1.2 log file with no start event (= no app id), it should be ignored. - writeOldLog("v12Log", "1.2", None, false) - - // Write 1.0 and 1.1 logs, which don't have app ids. - writeOldLog("v11Log", "1.1", None, true, - SparkListenerApplicationStart("v11Log", None, 2L, "test", None), - SparkListenerApplicationEnd(3L)) - writeOldLog("v10Log", "1.0", None, true, - SparkListenerApplicationStart("v10Log", None, 2L, "test", None), - SparkListenerApplicationEnd(4L)) - updateAndCheck(provider) { list => - list.size should be (2) - list(0).id should be ("v10Log") - list(1).id should be ("v11Log") + list.size should be (0) } } @@ -499,25 +435,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) } - private def writeOldLog( - fname: String, - sparkVersion: String, - codec: Option[CompressionCodec], - completed: Boolean, - events: SparkListenerEvent*): File = { - val log = new File(testDir, fname) - log.mkdir() - - val oldEventLog = new File(log, LOG_PREFIX + "1") - createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion)) - writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*) - if (completed) { - createEmptyFile(new File(log, APPLICATION_COMPLETE)) - } - - log - } - private class SafeModeTestProvider(conf: SparkConf, clock: Clock) extends FsHistoryProvider(conf, clock) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index be55b2e0fe1b7e373f35265e4eb44b91d33fc049..40d0076eecfc8e299befa612c85186f09e81be4c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -176,18 +176,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers (1 to 2).foreach { attemptId => doDownloadTest("local-1430917381535", Some(attemptId)) } } - test("download legacy logs - all attempts") { - doDownloadTest("local-1426533911241", None, legacy = true) - } - - test("download legacy logs - single attempts") { - (1 to 2). foreach { - attemptId => doDownloadTest("local-1426533911241", Some(attemptId), legacy = true) - } - } - // Test that the files are downloaded correctly, and validate them. - def doDownloadTest(appId: String, attemptId: Option[Int], legacy: Boolean = false): Unit = { + def doDownloadTest(appId: String, attemptId: Option[Int]): Unit = { val url = attemptId match { case Some(id) => @@ -205,22 +195,13 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers var entry = zipStream.getNextEntry entry should not be null val totalFiles = { - if (legacy) { - attemptId.map { x => 3 }.getOrElse(6) - } else { - attemptId.map { x => 1 }.getOrElse(2) - } + attemptId.map { x => 1 }.getOrElse(2) } var filesCompared = 0 while (entry != null) { if (!entry.isDirectory) { val expectedFile = { - if (legacy) { - val splits = entry.getName.split("/") - new File(new File(logDir, splits(0)), splits(1)) - } else { - new File(logDir, entry.getName) - } + new File(logDir, entry.getName) } val expected = Files.toString(expectedFile, Charsets.UTF_8) val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8)