Skip to content
Snippets Groups Projects
Commit 5687bab8 authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Andrew Or
Browse files

[SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.

Clean up some test setup code to remove duplicate instantiation of the
provider. Also make sure unfinished apps are sorted correctly.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #4370 from vanzin/SPARK-5600 and squashes the following commits:

0d048d5 [Marcelo Vanzin] Cleanup test code a bit.
2585119 [Marcelo Vanzin] Review feedback.
8b97544 [Marcelo Vanzin] Merge branch 'master' into SPARK-5600
be979e9 [Marcelo Vanzin] Merge branch 'master' into SPARK-5600
298371c [Marcelo Vanzin] [SPARK-5600] [core] Clean up FsHistoryProvider test, fix app sort order.
parent ca66159a
No related branches found
No related tags found
No related merge requests found
......@@ -194,7 +194,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
.sortBy { info => (-info.endTime, -info.startTime) }
.sortWith(compareAppInfo)
lastModifiedTime = newLastModifiedTime
......@@ -214,7 +214,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newIterator.head.endTime > oldIterator.head.endTime) {
if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
......@@ -230,6 +230,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}
}
/**
* Comparison function that defines the sort order for the application listing.
*
* @return Whether `i1` should precede `i2`.
*/
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
}
/**
* Replays the events in the specified log file and returns information about the associated
* application.
......
......@@ -37,13 +37,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
private var testDir: File = null
private var provider: FsHistoryProvider = null
before {
testDir = Utils.createTempDir()
provider = new FsHistoryProvider(new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.history.fs.updateInterval", "0"))
}
after {
......@@ -51,40 +46,41 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("Parse new and old application logs") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.history.fs.updateInterval", "0")
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())
// Write a new-style application log.
val logFile1 = new File(testDir, "new1")
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
val newAppComplete = new File(testDir, "new1")
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
SparkListenerApplicationEnd(4L)
)
// Write an unfinished app, new-style.
val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app2-2", None, 1L, "test")
val newAppIncomplete = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)
// Write an old-style application log.
val oldLog = new File(testDir, "old1")
oldLog.mkdir()
createEmptyFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("app3", None, 2L, "test"),
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldLog, provider.APPLICATION_COMPLETE))
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
// Check for logs so that we force the older unfinished app to be loaded, to make
// sure unfinished apps are also sorted correctly.
provider.checkForLogs()
// Write an unfinished app, old-style.
val oldLog2 = new File(testDir, "old2")
oldLog2.mkdir()
createEmptyFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("app4", None, 2L, "test")
val oldAppIncomplete = new File(testDir, "old2")
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
)
// Force a reload of data from the log directory, and check that both logs are loaded.
......@@ -96,14 +92,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (4)
list.count(e => e.completed) should be (2)
list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
oldLog.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
logFile1.lastModified(), "test", true))
list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
oldLog2.lastModified(), "test", false))
list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
logFile2.lastModified(), "test", false))
list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 4L,
newAppComplete.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
-1L, oldAppIncomplete.lastModified(), "test", false))
list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
-1L, newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered.
list.foreach { case info =>
......@@ -113,6 +109,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("Parse legacy logs with compression codec set") {
val provider = new FsHistoryProvider(createTestConf())
val testCodecs = List((classOf[LZFCompressionCodec].getName(), true),
(classOf[SnappyCompressionCodec].getName(), true),
("invalid.codec", false))
......@@ -156,10 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)
logFile2.setReadable(false, false)
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.history.fs.updateInterval", "0")
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())
provider.checkForLogs()
val list = provider.getListing().toSeq
......@@ -168,10 +162,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("history file is renamed from inprogress to completed") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.testing", "true")
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
......@@ -191,9 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}
test("SPARK-5582: empty log directory") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
val provider = new FsHistoryProvider(conf)
val provider = new FsHistoryProvider(createTestConf())
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
......@@ -229,4 +218,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
new FileOutputStream(file).close()
}
private def createTestConf(): SparkConf = {
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment